diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java index 72bb72da..7515cb51 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -22,12 +22,12 @@ import java.util.List; public class BroadcastProcessorDemo implements BroadcastProcessor { @Override - public ProcessResult preProcess(TaskContext context) throws Exception { + public ProcessResult preProcess(TaskContext context) { System.out.println("===== BroadcastProcessorDemo#preProcess ======"); context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost()); if ("rootFailed".equals(context.getJobParams())) { return new ProcessResult(false, "console need failed"); - }else { + } else { return new ProcessResult(true); } } @@ -40,7 +40,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor { long sleepTime = 1000; try { sleepTime = Long.parseLong(taskContext.getJobParams()); - }catch (Exception e) { + } catch (Exception e) { logger.warn("[BroadcastProcessor] parse sleep time failed!", e); } Thread.sleep(Math.max(sleepTime, 1000)); @@ -48,7 +48,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor { } @Override - public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception { + public ProcessResult postProcess(TaskContext context, List taskResults) { System.out.println("===== BroadcastProcessorDemo#postProcess ======"); context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults); return new ProcessResult(true, "success"); diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapProcessorDemo.java index e089532d..e9f03276 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapProcessorDemo.java @@ -39,13 +39,13 @@ public class MapProcessorDemo implements MapProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { - System.out.println("============== MapProcessorDemo#process =============="); - System.out.println("isRootTask:" + isRootTask()); - System.out.println("taskContext:" + JsonUtils.toJSONString(context)); - System.out.println(mysteryService.hasaki()); + log.info("============== MapProcessorDemo#process =============="); + log.info("isRootTask:{}", isRootTask()); + log.info("taskContext:{}", JsonUtils.toJSONString(context)); + log.info("{}", mysteryService.hasaki()); if (isRootTask()) { - System.out.println("==== MAP ===="); + log.info("==== MAP ===="); List subTasks = Lists.newLinkedList(); for (int j = 0; j < BATCH_NUM; j++) { SubTask subTask = new SubTask(); @@ -60,16 +60,16 @@ public class MapProcessorDemo implements MapProcessor { return new ProcessResult(true, "map successfully"); } else { - System.out.println("==== PROCESS ===="); + log.info("==== PROCESS ===="); SubTask subTask = (SubTask) context.getSubTask(); for (Integer itemId : subTask.getItemIds()) { if (Thread.interrupted()) { // 任务被中断 - System.out.println("job has been stop! so stop to process subTask:" + subTask.getSiteId() + "=>" + itemId); + log.info("job has been stop! so stop to process subTask: {} => {}", subTask.getSiteId(), itemId); break; } - System.out.println("processing subTask: " + subTask.getSiteId() + "=>" + itemId); - int max = Integer.MAX_VALUE >> 4; + log.info("processing subTask: {} => {}", subTask.getSiteId(), itemId); + int max = Integer.MAX_VALUE >> 7; for (int i = 0; ; i++) { // 模拟耗时操作 if (i > max) { @@ -80,6 +80,10 @@ public class MapProcessorDemo implements MapProcessor { // 测试在 Map 任务中追加上下文 context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road."); boolean b = ThreadLocalRandom.current().nextBoolean(); + if (context.getCurrentRetryTimes() >= 1) { + // 重试的话一定会成功 + b = true; + } return new ProcessResult(b, "RESULT:" + b); } } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index b761a770..215a95a0 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -35,9 +35,9 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { OmsLogger omsLogger = context.getOmsLogger(); - System.out.println("============== TestMapReduceProcessor#process =============="); - System.out.println("isRootTask:" + isRootTask()); - System.out.println("taskContext:" + JsonUtils.toJSONString(context)); + log.info("============== TestMapReduceProcessor#process =============="); + log.info("isRootTask:{}", isRootTask()); + log.info("taskContext:{}", JsonUtils.toJSONString(context)); // 根据控制台参数获取MR批次及子任务大小 final JSONObject jobParams = JSONObject.parseObject(context.getJobParams()); @@ -46,7 +46,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10); if (isRootTask()) { - System.out.println("==== MAP ===="); + log.info("==== MAP ===="); omsLogger.info("[DemoMRProcessor] start root task~"); List subTasks = Lists.newLinkedList(); for (int j = 0; j < batchNum; j++) { @@ -59,14 +59,14 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { } omsLogger.info("[DemoMRProcessor] map success~"); return new ProcessResult(true, "MAP_SUCCESS"); - }else { - System.out.println("==== NORMAL_PROCESS ===="); + } else { + log.info("==== NORMAL_PROCESS ===="); omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask())); - System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); + log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask())); Thread.sleep(1000); if (context.getCurrentRetryTimes() == 0) { return new ProcessResult(false, "FIRST_FAILED"); - }else { + } else { return new ProcessResult(true, "PROCESS_SUCCESS"); } } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/StandaloneProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/StandaloneProcessorDemo.java index 06a96a90..d039c73d 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/StandaloneProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/StandaloneProcessorDemo.java @@ -1,5 +1,6 @@ package tech.powerjob.samples.processors; +import org.apache.commons.lang3.StringUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; @@ -21,25 +22,29 @@ public class StandaloneProcessorDemo implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { - OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context); omsLogger.info("Notice! If you want this job process failed, your jobParams need to be 'failed'"); - omsLogger.info("Let's test the exception~"); // 测试异常日志 try { Collections.emptyList().add("277"); - }catch (Exception e) { + } catch (Exception e) { omsLogger.error("oh~it seems that we have an exception~", e); } - - System.out.println("================ StandaloneProcessorDemo#process ================"); - System.out.println(context.getJobParams()); - // 根据控制台参数判断是否成功 - boolean success = !"failed".equals(context.getJobParams()); - omsLogger.info("StandaloneProcessorDemo finished process,success: .", success); - + log.info("================ StandaloneProcessorDemo#process ================"); + log.info("jobParam:{}", context.getJobParams()); + log.info("instanceParams:{}", context.getInstanceParams()); + String param; + // 解析参数,非处于工作流中时,优先取实例参数(允许动态[instanceParams]覆盖静态参数[jobParams]) + if (context.getWorkflowContext() == null) { + param = StringUtils.isBlank(context.getInstanceParams()) ? context.getJobParams() : context.getInstanceParams(); + } else { + param = context.getJobParams(); + } + // 根据参数判断是否成功 + boolean success = !"failed".equals(param); + omsLogger.info("StandaloneProcessorDemo finished process,success: {}", success); omsLogger.info("anyway, we finished the job successfully~Congratulations!"); return new ProcessResult(success, context + ": " + success); } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/TimeoutProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/TimeoutProcessor.java index 85c1a208..e630ae8b 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/TimeoutProcessor.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/TimeoutProcessor.java @@ -1,20 +1,24 @@ package tech.powerjob.samples.processors; +import lombok.extern.slf4j.Slf4j; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import org.springframework.stereotype.Component; /** - * 测试超时任务 + * 测试超时任务(可中断) * * @author tjq * @since 2020/4/20 */ @Component +@Slf4j public class TimeoutProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { + long sleepTime = Long.parseLong(context.getJobParams()); + log.info("TaskInstance({}) will sleep {} ms", context.getInstanceId(), sleepTime); Thread.sleep(Long.parseLong(context.getJobParams())); return new ProcessResult(true, "impossible~~~~QAQ~"); } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/AppendWorkflowContextTester.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/AppendWorkflowContextTester.java index f735b400..05133263 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/AppendWorkflowContextTester.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/AppendWorkflowContextTester.java @@ -19,7 +19,6 @@ public class AppendWorkflowContextTester implements BasicProcessor { private static final String FAIL_CODE = "0"; - @Override @SuppressWarnings("squid:S106") public ProcessResult process(TaskContext context) throws Exception { diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceTester.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceTester.java index 58819b89..50fbdbdd 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceTester.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceTester.java @@ -6,20 +6,22 @@ import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import org.springframework.stereotype.Component; /** - * 测试用户反馈的无法停止实例的问题 + * 测试用户反馈的无法停止实例的问题 (可中断) * https://github.com/PowerJob/PowerJob/issues/37 * * @author tjq * @since 2020/7/30 */ @Component +@SuppressWarnings("all") public class StopInstanceTester implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { int i = 0; while (true) { System.out.println(i++); - Thread.sleep(1000*10); + // interruptable + Thread.sleep(10000L); } } } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceUninterruptibleTester.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceUninterruptibleTester.java new file mode 100644 index 00000000..a298afb0 --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/StopInstanceUninterruptibleTester.java @@ -0,0 +1,30 @@ +package tech.powerjob.samples.tester; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +/** + * 停止实例 (不可中断) + * + * @author Echo009 + * @since 2023/1/15 + */ +@Component +@Slf4j +@SuppressWarnings("all") +public class StopInstanceUninterruptibleTester implements BasicProcessor { + @Override + public ProcessResult process(TaskContext context) throws Exception { + int i = 0; + while (true) { + // uninterruptible + i++; + if (i % 1000000000 == 0){ + log.info("taskInstance({}) is running ...",context.getInstanceId()); + } + } + } +} diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/workflow/WorkflowStandaloneProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/workflow/WorkflowStandaloneProcessor.java index 02e51dd9..8fdb93e9 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/workflow/WorkflowStandaloneProcessor.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/workflow/WorkflowStandaloneProcessor.java @@ -1,6 +1,7 @@ package tech.powerjob.samples.workflow; import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; @@ -16,20 +17,20 @@ import java.util.Map; * @since 2020/6/2 */ @Component +@Slf4j public class WorkflowStandaloneProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { OmsLogger logger = context.getOmsLogger(); - logger.info("current:" + context.getJobParams()); - System.out.println("jobParams: " + context.getJobParams()); - System.out.println("currentContext:"+JSON.toJSONString(context)); + logger.info("current jobParams: {}", context.getJobParams()); + logger.info("current context: {}", context.getWorkflowContext()); + log.info("jobParams:{}", context.getJobParams()); + log.info("currentContext:{}", JSON.toJSONString(context)); // 尝试获取上游任务 Map workflowContext = context.getWorkflowContext().fetchWorkflowContext(); - System.out.println("工作流上下文数据:"); - System.out.println(workflowContext); - + log.info("工作流上下文数据:{}", workflowContext); return new ProcessResult(true, context.getJobId() + " process successfully."); } }