feat: add some powerjob worker sample processores

This commit is contained in:
Echo009 2023-01-15 21:06:59 +08:00
parent 1b3134291c
commit 30abf08703
9 changed files with 86 additions and 41 deletions

View File

@ -22,7 +22,7 @@ import java.util.List;
public class BroadcastProcessorDemo implements BroadcastProcessor { public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override @Override
public ProcessResult preProcess(TaskContext context) throws Exception { public ProcessResult preProcess(TaskContext context) {
System.out.println("===== BroadcastProcessorDemo#preProcess ======"); System.out.println("===== BroadcastProcessorDemo#preProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost()); context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost());
if ("rootFailed".equals(context.getJobParams())) { if ("rootFailed".equals(context.getJobParams())) {
@ -48,7 +48,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor {
} }
@Override @Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception { public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) {
System.out.println("===== BroadcastProcessorDemo#postProcess ======"); System.out.println("===== BroadcastProcessorDemo#postProcess ======");
context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults); context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults);
return new ProcessResult(true, "success"); return new ProcessResult(true, "success");

View File

@ -39,13 +39,13 @@ public class MapProcessorDemo implements MapProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
System.out.println("============== MapProcessorDemo#process =============="); log.info("============== MapProcessorDemo#process ==============");
System.out.println("isRootTask:" + isRootTask()); log.info("isRootTask:{}", isRootTask());
System.out.println("taskContext:" + JsonUtils.toJSONString(context)); log.info("taskContext:{}", JsonUtils.toJSONString(context));
System.out.println(mysteryService.hasaki()); log.info("{}", mysteryService.hasaki());
if (isRootTask()) { if (isRootTask()) {
System.out.println("==== MAP ===="); log.info("==== MAP ====");
List<SubTask> subTasks = Lists.newLinkedList(); List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < BATCH_NUM; j++) { for (int j = 0; j < BATCH_NUM; j++) {
SubTask subTask = new SubTask(); SubTask subTask = new SubTask();
@ -60,16 +60,16 @@ public class MapProcessorDemo implements MapProcessor {
return new ProcessResult(true, "map successfully"); return new ProcessResult(true, "map successfully");
} else { } else {
System.out.println("==== PROCESS ===="); log.info("==== PROCESS ====");
SubTask subTask = (SubTask) context.getSubTask(); SubTask subTask = (SubTask) context.getSubTask();
for (Integer itemId : subTask.getItemIds()) { for (Integer itemId : subTask.getItemIds()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// 任务被中断 // 任务被中断
System.out.println("job has been stop! so stop to process subTask:" + subTask.getSiteId() + "=>" + itemId); log.info("job has been stop! so stop to process subTask: {} => {}", subTask.getSiteId(), itemId);
break; break;
} }
System.out.println("processing subTask: " + subTask.getSiteId() + "=>" + itemId); log.info("processing subTask: {} => {}", subTask.getSiteId(), itemId);
int max = Integer.MAX_VALUE >> 4; int max = Integer.MAX_VALUE >> 7;
for (int i = 0; ; i++) { for (int i = 0; ; i++) {
// 模拟耗时操作 // 模拟耗时操作
if (i > max) { if (i > max) {
@ -80,6 +80,10 @@ public class MapProcessorDemo implements MapProcessor {
// 测试在 Map 任务中追加上下文 // 测试在 Map 任务中追加上下文
context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road."); context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road.");
boolean b = ThreadLocalRandom.current().nextBoolean(); boolean b = ThreadLocalRandom.current().nextBoolean();
if (context.getCurrentRetryTimes() >= 1) {
// 重试的话一定会成功
b = true;
}
return new ProcessResult(b, "RESULT:" + b); return new ProcessResult(b, "RESULT:" + b);
} }
} }

View File

@ -35,9 +35,9 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
OmsLogger omsLogger = context.getOmsLogger(); OmsLogger omsLogger = context.getOmsLogger();
System.out.println("============== TestMapReduceProcessor#process =============="); log.info("============== TestMapReduceProcessor#process ==============");
System.out.println("isRootTask:" + isRootTask()); log.info("isRootTask:{}", isRootTask());
System.out.println("taskContext:" + JsonUtils.toJSONString(context)); log.info("taskContext:{}", JsonUtils.toJSONString(context));
// 根据控制台参数获取MR批次及子任务大小 // 根据控制台参数获取MR批次及子任务大小
final JSONObject jobParams = JSONObject.parseObject(context.getJobParams()); final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
@ -46,7 +46,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10); Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);
if (isRootTask()) { if (isRootTask()) {
System.out.println("==== MAP ===="); log.info("==== MAP ====");
omsLogger.info("[DemoMRProcessor] start root task~"); omsLogger.info("[DemoMRProcessor] start root task~");
List<TestSubTask> subTasks = Lists.newLinkedList(); List<TestSubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) { for (int j = 0; j < batchNum; j++) {
@ -60,9 +60,9 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
omsLogger.info("[DemoMRProcessor] map success~"); omsLogger.info("[DemoMRProcessor] map success~");
return new ProcessResult(true, "MAP_SUCCESS"); return new ProcessResult(true, "MAP_SUCCESS");
} else { } else {
System.out.println("==== NORMAL_PROCESS ===="); log.info("==== NORMAL_PROCESS ====");
omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask())); omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask()));
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask()));
Thread.sleep(1000); Thread.sleep(1000);
if (context.getCurrentRetryTimes() == 0) { if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED"); return new ProcessResult(false, "FIRST_FAILED");

View File

@ -1,5 +1,6 @@
package tech.powerjob.samples.processors; package tech.powerjob.samples.processors;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
@ -21,11 +22,9 @@ public class StandaloneProcessorDemo implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger(); OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context); omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context);
omsLogger.info("Notice! If you want this job process failed, your jobParams need to be 'failed'"); omsLogger.info("Notice! If you want this job process failed, your jobParams need to be 'failed'");
omsLogger.info("Let's test the exception~"); omsLogger.info("Let's test the exception~");
// 测试异常日志 // 测试异常日志
try { try {
@ -33,13 +32,19 @@ public class StandaloneProcessorDemo implements BasicProcessor {
} catch (Exception e) { } catch (Exception e) {
omsLogger.error("oh~it seems that we have an exception~", e); omsLogger.error("oh~it seems that we have an exception~", e);
} }
log.info("================ StandaloneProcessorDemo#process ================");
System.out.println("================ StandaloneProcessorDemo#process ================"); log.info("jobParam:{}", context.getJobParams());
System.out.println(context.getJobParams()); log.info("instanceParams:{}", context.getInstanceParams());
// 根据控制台参数判断是否成功 String param;
boolean success = !"failed".equals(context.getJobParams()); // 解析参数非处于工作流中时优先取实例参数允许动态[instanceParams]覆盖静态参数[jobParams]
omsLogger.info("StandaloneProcessorDemo finished process,success: .", success); if (context.getWorkflowContext() == null) {
param = StringUtils.isBlank(context.getInstanceParams()) ? context.getJobParams() : context.getInstanceParams();
} else {
param = context.getJobParams();
}
// 根据参数判断是否成功
boolean success = !"failed".equals(param);
omsLogger.info("StandaloneProcessorDemo finished process,success: {}", success);
omsLogger.info("anyway, we finished the job successfully~Congratulations!"); omsLogger.info("anyway, we finished the job successfully~Congratulations!");
return new ProcessResult(success, context + ": " + success); return new ProcessResult(success, context + ": " + success);
} }

View File

@ -1,20 +1,24 @@
package tech.powerjob.samples.processors; package tech.powerjob.samples.processors;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* 测试超时任务 * 测试超时任务可中断
* *
* @author tjq * @author tjq
* @since 2020/4/20 * @since 2020/4/20
*/ */
@Component @Component
@Slf4j
public class TimeoutProcessor implements BasicProcessor { public class TimeoutProcessor implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
long sleepTime = Long.parseLong(context.getJobParams());
log.info("TaskInstance({}) will sleep {} ms", context.getInstanceId(), sleepTime);
Thread.sleep(Long.parseLong(context.getJobParams())); Thread.sleep(Long.parseLong(context.getJobParams()));
return new ProcessResult(true, "impossible~~~~QAQ~"); return new ProcessResult(true, "impossible~~~~QAQ~");
} }

View File

@ -19,7 +19,6 @@ public class AppendWorkflowContextTester implements BasicProcessor {
private static final String FAIL_CODE = "0"; private static final String FAIL_CODE = "0";
@Override @Override
@SuppressWarnings("squid:S106") @SuppressWarnings("squid:S106")
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {

View File

@ -6,20 +6,22 @@ import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* 测试用户反馈的无法停止实例的问题 * 测试用户反馈的无法停止实例的问题 (可中断)
* https://github.com/PowerJob/PowerJob/issues/37 * https://github.com/PowerJob/PowerJob/issues/37
* *
* @author tjq * @author tjq
* @since 2020/7/30 * @since 2020/7/30
*/ */
@Component @Component
@SuppressWarnings("all")
public class StopInstanceTester implements BasicProcessor { public class StopInstanceTester implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
int i = 0; int i = 0;
while (true) { while (true) {
System.out.println(i++); System.out.println(i++);
Thread.sleep(1000*10); // interruptable
Thread.sleep(10000L);
} }
} }
} }

View File

@ -0,0 +1,30 @@
package tech.powerjob.samples.tester;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
/**
* 停止实例 (不可中断)
*
* @author Echo009
* @since 2023/1/15
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class StopInstanceUninterruptibleTester implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
int i = 0;
while (true) {
// uninterruptible
i++;
if (i % 1000000000 == 0){
log.info("taskInstance({}) is running ...",context.getInstanceId());
}
}
}
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.samples.workflow; package tech.powerjob.samples.workflow;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
@ -16,20 +17,20 @@ import java.util.Map;
* @since 2020/6/2 * @since 2020/6/2
*/ */
@Component @Component
@Slf4j
public class WorkflowStandaloneProcessor implements BasicProcessor { public class WorkflowStandaloneProcessor implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger(); OmsLogger logger = context.getOmsLogger();
logger.info("current:" + context.getJobParams()); logger.info("current jobParams: {}", context.getJobParams());
System.out.println("jobParams: " + context.getJobParams()); logger.info("current context: {}", context.getWorkflowContext());
System.out.println("currentContext:"+JSON.toJSONString(context)); log.info("jobParams:{}", context.getJobParams());
log.info("currentContext:{}", JSON.toJSONString(context));
// 尝试获取上游任务 // 尝试获取上游任务
Map<String, String> workflowContext = context.getWorkflowContext().fetchWorkflowContext(); Map<String, String> workflowContext = context.getWorkflowContext().fetchWorkflowContext();
System.out.println("工作流上下文数据:"); log.info("工作流上下文数据:{}", workflowContext);
System.out.println(workflowContext);
return new ProcessResult(true, context.getJobId() + " process successfully."); return new ProcessResult(true, context.getJobId() + " process successfully.");
} }
} }