2020-05-21 20:36:05 +08:00

653 lines
46 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<html lang="en" dir=>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="处理器概述 OhMyScheduler当前支持Shell、Python等脚本处理器和Java处理器。脚本处理器只需要开发者完成脚本的编写xxx.sh / xxx.py在控制台填入脚本内容即可本章不再赘述。本章将重点阐述Java处理器开发方法与使用技巧。
Java处理器可根据代码所处位置划分为内置Java处理器和容器Java处理器前者直接集成在宿主应用也就是接入本系统的业务应用一般用来处理业务需求后者可以在一个独立的轻量级的Java工程中开发通过容器技术详见容器章节被worker集群热加载提供Java的“脚本能力”一般用于处理灵活多变的需求。 Java处理器可根据对象创建者划分为SpringBean处理器和普通Java对象处理器前者由Spring IOC容器完成处理器的创建和初始化后者则有OhMyScheduler维护其状态。如果宿主应用支持Spring强烈建议使用SpringBean处理器开发者仅需要将Processor注册进Spring IOC容器一个@Component注解或一句bean配置。 Java处理器可根据功能划分为单机处理器、广播处理器、Map处理器和MapReduce处理器。 单机处理器BasicProcessor对应了单机任务即某个任务的某次运行只会有某一台机器的某一个线程参与运算。 广播处理器BroadcastProcessor对应了广播任务即某个任务的某次运行会调动集群内所有机器参与运算。 Map处理器MapProcessor对应了Map任务即某个任务在运行过程中允许产生子任务并分发到其他机器进行运算。 MapReduce处理器MapReduceProcessor对应了MapReduce任务在Map任务的基础上增加了所有任务结束后的汇总统计。 核心方法process 任意Java处理器都需要实现处理的核心方法其接口签名如下
ProcessResult process(TaskContext context) throws Exception; 方法入参TaskContext包含了本次处理的上下文信息具体属性如下
属性名称 意义/用法 jobId 任务ID开发者一般无需关心此参数 instanceId 任务实例ID全局唯一开发者一般无需关心此参数 subInstanceId 子任务实例ID秒级任务使用开发者一般无需关心此参数 taskId 采用链式命名法的ID在某个任务实例内唯一开发者一般无需关心此参数 taskName task名称Map/MapReduce任务的子任务的值为开发者指定否则为系统默认值开发者一般无需关心此参数 jobParams 任务参数,其值等同于控制台录入的任务参数,常用! instanceParams 任务实例参数其值等同于使用OpenAPI运行任务实例时传递的参数常用 maxRetryTimes Task的最大重试次数 currentRetryTimes Task的当前重试次数和maxRetryTimes联合起来可以判断当前是否为该Task的最后一次运行机会 subTask 子TaskMap/MapReduce处理器专属开发者调用map方法时传递的子任务列表中的某一个 omsLogger 在线日志用法同Slf4J记录的日志可以直接通过控制台查看非常便捷和强大不过使用过程中需要注意频率可能对Server造成巨大的压力 方法的返回值为ProcessResult代表了本次Task执行的结果包含success和msg两个属性分别用于传递Task是否执行成功和Task需要返回的信息。">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="处理器开发" />
<meta property="og:description" content="处理器概述 OhMyScheduler当前支持Shell、Python等脚本处理器和Java处理器。脚本处理器只需要开发者完成脚本的编写xxx.sh / xxx.py在控制台填入脚本内容即可本章不再赘述。本章将重点阐述Java处理器开发方法与使用技巧。
Java处理器可根据代码所处位置划分为内置Java处理器和容器Java处理器前者直接集成在宿主应用也就是接入本系统的业务应用一般用来处理业务需求后者可以在一个独立的轻量级的Java工程中开发通过容器技术详见容器章节被worker集群热加载提供Java的“脚本能力”一般用于处理灵活多变的需求。 Java处理器可根据对象创建者划分为SpringBean处理器和普通Java对象处理器前者由Spring IOC容器完成处理器的创建和初始化后者则有OhMyScheduler维护其状态。如果宿主应用支持Spring强烈建议使用SpringBean处理器开发者仅需要将Processor注册进Spring IOC容器一个@Component注解或一句bean配置。 Java处理器可根据功能划分为单机处理器、广播处理器、Map处理器和MapReduce处理器。 单机处理器BasicProcessor对应了单机任务即某个任务的某次运行只会有某一台机器的某一个线程参与运算。 广播处理器BroadcastProcessor对应了广播任务即某个任务的某次运行会调动集群内所有机器参与运算。 Map处理器MapProcessor对应了Map任务即某个任务在运行过程中允许产生子任务并分发到其他机器进行运算。 MapReduce处理器MapReduceProcessor对应了MapReduce任务在Map任务的基础上增加了所有任务结束后的汇总统计。 核心方法process 任意Java处理器都需要实现处理的核心方法其接口签名如下
ProcessResult process(TaskContext context) throws Exception; 方法入参TaskContext包含了本次处理的上下文信息具体属性如下
属性名称 意义/用法 jobId 任务ID开发者一般无需关心此参数 instanceId 任务实例ID全局唯一开发者一般无需关心此参数 subInstanceId 子任务实例ID秒级任务使用开发者一般无需关心此参数 taskId 采用链式命名法的ID在某个任务实例内唯一开发者一般无需关心此参数 taskName task名称Map/MapReduce任务的子任务的值为开发者指定否则为系统默认值开发者一般无需关心此参数 jobParams 任务参数,其值等同于控制台录入的任务参数,常用! instanceParams 任务实例参数其值等同于使用OpenAPI运行任务实例时传递的参数常用 maxRetryTimes Task的最大重试次数 currentRetryTimes Task的当前重试次数和maxRetryTimes联合起来可以判断当前是否为该Task的最后一次运行机会 subTask 子TaskMap/MapReduce处理器专属开发者调用map方法时传递的子任务列表中的某一个 omsLogger 在线日志用法同Slf4J记录的日志可以直接通过控制台查看非常便捷和强大不过使用过程中需要注意频率可能对Server造成巨大的压力 方法的返回值为ProcessResult代表了本次Task执行的结果包含success和msg两个属性分别用于传递Task是否执行成功和Task需要返回的信息。" />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://kfcfans.gitee.io/ohmyscheduler/docs/startup/3-processor-develop/" />
<title>处理器开发 | OhMyScheduler</title>
<link rel="manifest" href="/ohmyscheduler/manifest.json">
<link rel="icon" href="/ohmyscheduler/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/ohmyscheduler/book.min.e161f1fe2b283b6a43c29a52fde96e2387fade573e78efa6701d44c8499da76b.css" integrity="sha256-4WHx/isoO2pDwppS/eluI4f63lc&#43;eO&#43;mcB1EyEmdp2s=">
<script defer src="/ohmyscheduler/en.search.min.8c0739389d262fcac555a16288129e832618f93ba193658e88e4cfeb51a71475.js" integrity="sha256-jAc5OJ0mL8rFVaFiiBKegyYY&#43;Tuhk2WOiOTP61GnFHU="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
</head>
<body dir=>
<input type="checkbox" class="hidden" id="menu-control" />
<main class="container flex">
<aside class="book-menu">
<nav>
<h2 class="book-brand">
<a href="/ohmyscheduler"><span>OhMyScheduler</span>
</a>
</h2>
<div class="book-search">
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/" />
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
<ul>
<li>
<span>快速开始</span>
<ul>
<li>
<a href="/ohmyscheduler/docs/startup/1-server-startup/" class="">调度中心Server部署</a>
</li>
<li>
<a href="/ohmyscheduler/docs/startup/2-worker-startup/" class="">执行器Worker初始化</a>
</li>
<li>
<a href="/ohmyscheduler/docs/startup/3-processor-develop/" class="active">处理器开发</a>
</li>
<li>
<a href="/ohmyscheduler/docs/startup/4-console-guide/" class="">任务管理与在线运维</a>
</li>
</ul>
</li>
<li>
<span>高级特性</span>
<ul>
<li>
<a href="/ohmyscheduler/docs/super/container/" class="">容器</a>
</li>
<li>
<a href="/ohmyscheduler/docs/super/openapi/" class="">OpenAPI</a>
</li>
</ul>
</li>
<li>
<span>版本与升级</span>
<ul>
<li>
<a href="/ohmyscheduler/docs/version/update/" class="">更新日志</a>
</li>
<li>
<a href="/ohmyscheduler/docs/version/migrate/" class="">迁移指南</a>
</li>
</ul>
</li>
<li>
<a href="/ohmyscheduler/docs/todo/" class="">开发计划</a>
</li>
<li>
<a href="/ohmyscheduler/docs/faq/" class="">FAQ</a>
</li>
</ul>
</nav>
<script>(function(){var menu=document.querySelector("aside.book-menu nav");addEventListener("beforeunload",function(event){localStorage.setItem("menu.scrollTop",menu.scrollTop);});menu.scrollTop=localStorage.getItem("menu.scrollTop");})();</script>
</aside>
<div class="book-page">
<header class="book-header">
<div class="flex align-center justify-between">
<label for="menu-control">
<img src="/ohmyscheduler/svg/menu.svg" class="book-icon" alt="Menu" />
</label>
<strong>处理器开发</strong>
<label for="toc-control">
<img src="/ohmyscheduler/svg/toc.svg" class="book-icon" alt="Table of Contents" />
</label>
</div>
<input type="checkbox" class="hidden" id="toc-control" />
<aside class="hidden clearfix">
<nav id="TableOfContents">
<ul>
<li><a href="#处理器概述">处理器概述</a></li>
<li><a href="#核心方法process">核心方法process</a></li>
<li><a href="#单机处理器basicprocessor">单机处理器BasicProcessor</a></li>
<li><a href="#广播处理器broadcastprocessor">广播处理器BroadcastProcessor</a></li>
<li><a href="#并行处理器mapreduceprocessor">并行处理器MapReduceProcessor</a></li>
<li><a href="#最佳实践mapreduce实现静态分片">最佳实践MapReduce实现静态分片</a></li>
<li><a href="#最佳实践mapreduce多级分发处理">最佳实践MapReduce多级分发处理</a></li>
<li><a href="#更多示例">更多示例</a></li>
</ul>
</nav>
</aside>
</header>
<article class="markdown"><h2 id="处理器概述">处理器概述</h2>
<blockquote>
<p>OhMyScheduler当前支持Shell、Python等脚本处理器和Java处理器。脚本处理器只需要开发者完成脚本的编写xxx.sh / xxx.py在控制台填入脚本内容即可本章不再赘述。本章将重点阐述Java处理器开发方法与使用技巧。</p>
</blockquote>
<ul>
<li>Java处理器可根据<strong>代码所处位置</strong>划分为内置Java处理器和容器Java处理器前者直接集成在宿主应用也就是接入本系统的业务应用一般用来处理业务需求后者可以在一个独立的轻量级的Java工程中开发通过<strong>容器技术</strong>详见容器章节被worker集群热加载提供Java的“脚本能力”一般用于处理灵活多变的需求。</li>
<li>Java处理器可根据<strong>对象创建者</strong>划分为SpringBean处理器和普通Java对象处理器前者由Spring IOC容器完成处理器的创建和初始化后者则有OhMyScheduler维护其状态。如果宿主应用支持Spring<strong>强烈建议使用SpringBean处理器</strong>开发者仅需要将Processor注册进Spring IOC容器一个<code>@Component</code>注解或一句<code>bean</code>配置)。</li>
<li>Java处理器可根据<strong>功能</strong>划分为单机处理器、广播处理器、Map处理器和MapReduce处理器。
<ul>
<li>单机处理器(<code>BasicProcessor</code>)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。</li>
<li>广播处理器(<code>BroadcastProcessor</code>)对应了广播任务,即某个任务的某次运行会<strong>调动集群内所有机器参与运算</strong></li>
<li>Map处理器<code>MapProcessor</code>对应了Map任务即某个任务在运行过程中<strong>允许产生子任务并分发到其他机器进行运算</strong></li>
<li>MapReduce处理器<code>MapReduceProcessor</code>对应了MapReduce任务在Map任务的基础上<strong>增加了所有任务结束后的汇总统计</strong></li>
</ul>
</li>
</ul>
<h2 id="核心方法process">核心方法process</h2>
<p>任意Java处理器都需要实现处理的核心方法其接口签名如下</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java">ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception<span style="color:#f92672">;</span>
</code></pre></div><p>方法入参<code>TaskContext</code>,包含了本次处理的上下文信息,具体属性如下:</p>
<table>
<thead>
<tr>
<th>属性名称</th>
<th>意义/用法</th>
</tr>
</thead>
<tbody>
<tr>
<td>jobId</td>
<td>任务ID开发者一般无需关心此参数</td>
</tr>
<tr>
<td>instanceId</td>
<td>任务实例ID全局唯一开发者一般无需关心此参数</td>
</tr>
<tr>
<td>subInstanceId</td>
<td>子任务实例ID秒级任务使用开发者一般无需关心此参数</td>
</tr>
<tr>
<td>taskId</td>
<td>采用链式命名法的ID在某个任务实例内唯一开发者一般无需关心此参数</td>
</tr>
<tr>
<td>taskName</td>
<td>task名称Map/MapReduce任务的子任务的值为开发者指定否则为系统默认值开发者一般无需关心此参数</td>
</tr>
<tr>
<td>jobParams</td>
<td>任务参数,其值等同于控制台录入的<strong>任务参数</strong>,常用!</td>
</tr>
<tr>
<td>instanceParams</td>
<td>任务实例参数其值等同于使用OpenAPI运行任务实例时传递的参数常用</td>
</tr>
<tr>
<td>maxRetryTimes</td>
<td>Task的最大重试次数</td>
</tr>
<tr>
<td>currentRetryTimes</td>
<td>Task的当前重试次数和maxRetryTimes联合起来可以判断当前是否为该Task的最后一次运行机会</td>
</tr>
<tr>
<td>subTask</td>
<td>子TaskMap/MapReduce处理器专属开发者调用map方法时传递的子任务列表中的某一个</td>
</tr>
<tr>
<td>omsLogger</td>
<td>在线日志用法同Slf4J记录的日志可以直接通过控制台查看非常便捷和强大不过使用过程中需要注意频率可能对Server造成巨大的压力</td>
</tr>
</tbody>
</table>
<p>方法的返回值为<code>ProcessResult</code>代表了本次Task执行的结果包含<code>success</code><code>msg</code>两个属性分别用于传递Task是否执行成功和Task需要返回的信息。</p>
<h2 id="单机处理器basicprocessor">单机处理器BasicProcessor</h2>
<p>单机执行的策略下server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口<code>BasicProcessor</code>,代码示例如下:</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java"><span style="color:#75715e">// 支持 SpringBean 的形式
</span><span style="color:#75715e"></span><span style="color:#a6e22e">@Component</span>
<span style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">BasicProcessorDemo</span> <span style="color:#66d9ef">implements</span> BasicProcessor <span style="color:#f92672">{</span>
<span style="color:#a6e22e">@Resource</span>
<span style="color:#66d9ef">private</span> MysteryService mysteryService<span style="color:#f92672">;</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#75715e">// 在线日志功能,可以直接在控制台查看任务日志,非常便捷
</span><span style="color:#75715e"></span> OmsLogger omsLogger <span style="color:#f92672">=</span> context<span style="color:#f92672">.</span><span style="color:#a6e22e">getOmsLogger</span><span style="color:#f92672">();</span>
omsLogger<span style="color:#f92672">.</span><span style="color:#a6e22e">info</span><span style="color:#f92672">(</span><span style="color:#e6db74">&#34;BasicProcessorDemo start to process, current JobParams is {}.&#34;</span><span style="color:#f92672">,</span> context<span style="color:#f92672">.</span><span style="color:#a6e22e">getJobParams</span><span style="color:#f92672">());</span>
<span style="color:#75715e">// TaskContext为任务的上下文信息包含了在控制台录入的任务元数据常用字段为
</span><span style="color:#75715e"></span> <span style="color:#75715e">// jobParams任务参数在控制台录入instanceParams任务实例参数通过 OpenAPI 触发的任务实例才可能存在该参数)
</span><span style="color:#75715e"></span>
<span style="color:#75715e">// 进行实际处理...
</span><span style="color:#75715e"></span> mysteryService<span style="color:#f92672">.</span><span style="color:#a6e22e">hasaki</span><span style="color:#f92672">();</span>
<span style="color:#75715e">// 返回结果,该结果会被持久化到数据库,在前端页面直接查看,极为方便
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;result is xxx&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">}</span>
</code></pre></div><h2 id="广播处理器broadcastprocessor">广播处理器BroadcastProcessor</h2>
<p>广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在<code>BasicProcessor</code>的基础上额外增加了<code>preProcess</code><code>postProcess</code>方法,分别在整个集群开始之前/结束之后<strong>选一台机器</strong>执行相关方法。代码示例如下:</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java"><span style="color:#a6e22e">@Component</span>
<span style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">BroadcastProcessorDemo</span> <span style="color:#66d9ef">extends</span> BroadcastProcessor <span style="color:#f92672">{</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">preProcess</span><span style="color:#f92672">(</span>TaskContext taskContext<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#75715e">// 预执行,会在所有 worker 执行 process 方法前调用
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;init success&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#75715e">// 撰写整个worker集群都会执行的代码逻辑
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;release resource success&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">postProcess</span><span style="color:#f92672">(</span>TaskContext taskContext<span style="color:#f92672">,</span> List<span style="color:#f92672">&lt;</span>TaskResult<span style="color:#f92672">&gt;</span> taskResults<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#75715e">// taskResults 存储了所有worker执行的结果包括preProcess
</span><span style="color:#75715e"></span>
<span style="color:#75715e">// 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;process success&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">}</span>
</code></pre></div><h2 id="并行处理器mapreduceprocessor">并行处理器MapReduceProcessor</h2>
<p>MapReduce是最复杂也是最强大的一种执行器它允许开发者完成任务的拆分将子任务派发到集群中其他Worker执行是执行大批量处理任务的不二之选实现MapReduce处理器需要继承<code>MapReduceProcessor</code>类,具体用法如下示例代码所示:</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java"><span style="color:#a6e22e">@Component</span>
<span style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">MapReduceProcessorDemo</span> <span style="color:#66d9ef">extends</span> MapReduceProcessor <span style="color:#f92672">{</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#75715e">// 判断是否为根任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>isRootTask<span style="color:#f92672">())</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 构造子任务
</span><span style="color:#75715e"></span> List<span style="color:#f92672">&lt;</span>SubTask<span style="color:#f92672">&gt;</span> subTaskList <span style="color:#f92672">=</span> Lists<span style="color:#f92672">.</span><span style="color:#a6e22e">newLinkedList</span><span style="color:#f92672">();</span>
<span style="color:#75715e">/*
</span><span style="color:#75715e"> * 子任务的构造由开发者自己定义
</span><span style="color:#75715e"> * eg. 现在需要从文件中读取100W个ID并处理数据库中这些ID对应的数据那么步骤如下
</span><span style="color:#75715e"> * 1. 根任务RootTask读取文件流式拉取100W个ID并按1000个一批的大小组装成子任务进行派发
</span><span style="color:#75715e"> * 2. 非根任务获取子任务,完成业务逻辑的处理
</span><span style="color:#75715e"> */</span>
<span style="color:#75715e">// 调用 map 方法,派发子任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> map<span style="color:#f92672">(</span>subTaskList<span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;DATA_PROCESS_TASK&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#75715e">// 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>context<span style="color:#f92672">.</span><span style="color:#a6e22e">getSubTask</span><span style="color:#f92672">()</span> <span style="color:#66d9ef">instanceof</span> SubTask<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;PROCESS_SUB_TASK_SUCCESS&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">false</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;UNKNOWN_BUG&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">reduce</span><span style="color:#f92672">(</span>TaskContext taskContext<span style="color:#f92672">,</span> List<span style="color:#f92672">&lt;</span>TaskResult<span style="color:#f92672">&gt;</span> taskResults<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 所有 Task 执行结束后reduce 将会被执行
</span><span style="color:#75715e"></span> <span style="color:#75715e">// taskResults 保存了所有子任务的执行结果
</span><span style="color:#75715e"></span>
<span style="color:#75715e">// 用法举例,统计执行结果
</span><span style="color:#75715e"></span> AtomicLong successCnt <span style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> AtomicLong<span style="color:#f92672">(</span>0<span style="color:#f92672">);</span>
taskResults<span style="color:#f92672">.</span><span style="color:#a6e22e">forEach</span><span style="color:#f92672">(</span>tr <span style="color:#f92672">-&gt;</span> <span style="color:#f92672">{</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>tr<span style="color:#f92672">.</span><span style="color:#a6e22e">isSuccess</span><span style="color:#f92672">())</span> <span style="color:#f92672">{</span>
successCnt<span style="color:#f92672">.</span><span style="color:#a6e22e">incrementAndGet</span><span style="color:#f92672">();</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">});</span>
<span style="color:#75715e">// 该结果将作为任务最终的执行结果
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;success task num:&#34;</span> <span style="color:#f92672">+</span> successCnt<span style="color:#f92672">.</span><span style="color:#a6e22e">get</span><span style="color:#f92672">());</span>
<span style="color:#f92672">}</span>
<span style="color:#75715e">// 自定义的子任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">private</span> <span style="color:#66d9ef">static</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">SubTask</span> <span style="color:#f92672">{</span>
<span style="color:#66d9ef">private</span> Long siteId<span style="color:#f92672">;</span>
<span style="color:#66d9ef">private</span> List<span style="color:#f92672">&lt;</span>Long<span style="color:#f92672">&gt;</span> idList<span style="color:#f92672">;</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">}</span>
</code></pre></div><p>Map处理器相当于MapReduce处理器的阉割版本阉割了<code>reduce</code>方法),此处不再单独举例。</p>
<h2 id="最佳实践mapreduce实现静态分片">最佳实践MapReduce实现静态分片</h2>
<p>虽然说这有点傻鸡焉用牛刀的感觉,不过既然目前市场上同类产品都处于静态分片的阶段,我也就在这里给大家举个例子吧~</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java"><span style="color:#a6e22e">@Component</span>
<span style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">StaticSliceProcessor</span> <span style="color:#66d9ef">extends</span> MapReduceProcessor <span style="color:#f92672">{</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
OmsLogger omsLogger <span style="color:#f92672">=</span> context<span style="color:#f92672">.</span><span style="color:#a6e22e">getOmsLogger</span><span style="color:#f92672">();</span>
<span style="color:#75715e">// root task 负责分发任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>isRootTask<span style="color:#f92672">())</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 从控制台传递分片参数架设格式为KV1=a&amp;2=b&amp;3=c
</span><span style="color:#75715e"></span> String jobParams <span style="color:#f92672">=</span> context<span style="color:#f92672">.</span><span style="color:#a6e22e">getJobParams</span><span style="color:#f92672">();</span>
Map<span style="color:#f92672">&lt;</span>String<span style="color:#f92672">,</span> String<span style="color:#f92672">&gt;</span> paramsMap <span style="color:#f92672">=</span> Splitter<span style="color:#f92672">.</span><span style="color:#a6e22e">on</span><span style="color:#f92672">(</span><span style="color:#e6db74">&#34;&amp;&#34;</span><span style="color:#f92672">).</span><span style="color:#a6e22e">withKeyValueSeparator</span><span style="color:#f92672">(</span><span style="color:#e6db74">&#34;=&#34;</span><span style="color:#f92672">).</span><span style="color:#a6e22e">split</span><span style="color:#f92672">(</span>jobParams<span style="color:#f92672">);</span>
List<span style="color:#f92672">&lt;</span>SubTask<span style="color:#f92672">&gt;</span> subTasks <span style="color:#f92672">=</span> Lists<span style="color:#f92672">.</span><span style="color:#a6e22e">newLinkedList</span><span style="color:#f92672">();</span>
paramsMap<span style="color:#f92672">.</span><span style="color:#a6e22e">forEach</span><span style="color:#f92672">((</span>k<span style="color:#f92672">,</span> v<span style="color:#f92672">)</span> <span style="color:#f92672">-&gt;</span> subTasks<span style="color:#f92672">.</span><span style="color:#a6e22e">add</span><span style="color:#f92672">(</span><span style="color:#66d9ef">new</span> SubTask<span style="color:#f92672">(</span>Integer<span style="color:#f92672">.</span><span style="color:#a6e22e">parseInt</span><span style="color:#f92672">(</span>k<span style="color:#f92672">),</span> v<span style="color:#f92672">)));</span>
<span style="color:#66d9ef">return</span> map<span style="color:#f92672">(</span>subTasks<span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;SLICE_TASK&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
Object subTask <span style="color:#f92672">=</span> context<span style="color:#f92672">.</span><span style="color:#a6e22e">getSubTask</span><span style="color:#f92672">();</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>subTask <span style="color:#66d9ef">instanceof</span> SubTask<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 实际处理
</span><span style="color:#75715e"></span> <span style="color:#75715e">// 当然,如果觉得 subTask 还是很大,也可以继续分发哦
</span><span style="color:#75715e"></span>
<span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;subTask:&#34;</span> <span style="color:#f92672">+</span> <span style="color:#f92672">((</span>SubTask<span style="color:#f92672">)</span> subTask<span style="color:#f92672">).</span><span style="color:#a6e22e">getIndex</span><span style="color:#f92672">()</span> <span style="color:#f92672">+</span> <span style="color:#e6db74">&#34; process successfully&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">false</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;UNKNOWN BUG&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">reduce</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">,</span> List<span style="color:#f92672">&lt;</span>TaskResult<span style="color:#f92672">&gt;</span> taskResults<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;xxxx&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Getter</span>
<span style="color:#a6e22e">@NoArgsConstructor</span>
<span style="color:#a6e22e">@AllArgsConstructor</span>
<span style="color:#66d9ef">private</span> <span style="color:#66d9ef">static</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">SubTask</span> <span style="color:#f92672">{</span>
<span style="color:#66d9ef">private</span> <span style="color:#66d9ef">int</span> index<span style="color:#f92672">;</span>
<span style="color:#66d9ef">private</span> String params<span style="color:#f92672">;</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">}</span>
</code></pre></div><h2 id="最佳实践mapreduce多级分发处理">最佳实践MapReduce多级分发处理</h2>
<p>利用MapReduce实现 Root -&gt; A -&gt; B/C -&gt; Reduce的DAG 工作流。</p>
<div class="highlight"><pre style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4"><code class="language-java" data-lang="java"><span style="color:#a6e22e">@Component</span>
<span style="color:#66d9ef">public</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">DAGSimulationProcessor</span> <span style="color:#66d9ef">extends</span> MapReduceProcessor <span style="color:#f92672">{</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">process</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">)</span> <span style="color:#66d9ef">throws</span> Exception <span style="color:#f92672">{</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>isRootTask<span style="color:#f92672">())</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// L1. 执行根任务
</span><span style="color:#75715e"></span>
<span style="color:#75715e">// 执行完毕后产生子任务 A需要传递的参数可以作为 TaskA 的属性进行传递
</span><span style="color:#75715e"></span> TaskA taskA <span style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> TaskA<span style="color:#f92672">();</span>
<span style="color:#66d9ef">return</span> map<span style="color:#f92672">(</span>Lists<span style="color:#f92672">.</span><span style="color:#a6e22e">newArrayList</span><span style="color:#f92672">(</span>taskA<span style="color:#f92672">),</span> <span style="color:#e6db74">&#34;LEVEL1_TASK_A&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>context<span style="color:#f92672">.</span><span style="color:#a6e22e">getSubTask</span><span style="color:#f92672">()</span> <span style="color:#66d9ef">instanceof</span> TaskA<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// L2. 执行A任务
</span><span style="color:#75715e"></span>
<span style="color:#75715e">// 执行完成后产生子任务 BC并行执行
</span><span style="color:#75715e"></span> TaskB taskB <span style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> TaskB<span style="color:#f92672">();</span>
TaskC taskC <span style="color:#f92672">=</span> <span style="color:#66d9ef">new</span> TaskC<span style="color:#f92672">();</span>
<span style="color:#66d9ef">return</span> map<span style="color:#f92672">(</span>Lists<span style="color:#f92672">.</span><span style="color:#a6e22e">newArrayList</span><span style="color:#f92672">(</span>taskB<span style="color:#f92672">,</span> taskC<span style="color:#f92672">),</span> <span style="color:#e6db74">&#34;LEVEL2_TASK_BC&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>context<span style="color:#f92672">.</span><span style="color:#a6e22e">getSubTask</span><span style="color:#f92672">()</span> <span style="color:#66d9ef">instanceof</span> TaskB<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// L3. 执行B任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;xxx&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">if</span> <span style="color:#f92672">(</span>context<span style="color:#f92672">.</span><span style="color:#a6e22e">getSubTask</span><span style="color:#f92672">()</span> <span style="color:#66d9ef">instanceof</span> TaskC<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// L3. 执行C任务
</span><span style="color:#75715e"></span> <span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;xxx&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">false</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;UNKNOWN_TYPE_OF_SUB_TASK&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#a6e22e">@Override</span>
<span style="color:#66d9ef">public</span> ProcessResult <span style="color:#a6e22e">reduce</span><span style="color:#f92672">(</span>TaskContext context<span style="color:#f92672">,</span> List<span style="color:#f92672">&lt;</span>TaskResult<span style="color:#f92672">&gt;</span> taskResults<span style="color:#f92672">)</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// L4. 执行最终 Reduce 任务taskResults保存了之前所有任务的结果
</span><span style="color:#75715e"></span> taskResults<span style="color:#f92672">.</span><span style="color:#a6e22e">forEach</span><span style="color:#f92672">(</span>taskResult <span style="color:#f92672">-&gt;</span> <span style="color:#f92672">{</span>
<span style="color:#75715e">// do something...
</span><span style="color:#75715e"></span> <span style="color:#f92672">});</span>
<span style="color:#66d9ef">return</span> <span style="color:#66d9ef">new</span> ProcessResult<span style="color:#f92672">(</span><span style="color:#66d9ef">true</span><span style="color:#f92672">,</span> <span style="color:#e6db74">&#34;reduce success&#34;</span><span style="color:#f92672">);</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">private</span> <span style="color:#66d9ef">static</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">TaskA</span> <span style="color:#f92672">{</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">private</span> <span style="color:#66d9ef">static</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">TaskB</span> <span style="color:#f92672">{</span>
<span style="color:#f92672">}</span>
<span style="color:#66d9ef">private</span> <span style="color:#66d9ef">static</span> <span style="color:#66d9ef">class</span> <span style="color:#a6e22e">TaskC</span> <span style="color:#f92672">{</span>
<span style="color:#f92672">}</span>
<span style="color:#f92672">}</span>
</code></pre></div><h2 id="更多示例">更多示例</h2>
<p>没看够?更多示例请见:<a href="https://github.com/KFCFans/OhMyScheduler/tree/master/oh-my-scheduler-worker-samples">oh-my-scheduler-worker-samples</a></p>
</article>
<footer class="book-footer">
<div class="flex flex-wrap justify-between">
</div>
</footer>
<div class="book-comments">
</div>
<label for="menu-control" class="hidden book-menu-overlay"></label>
</div>
<aside class="book-toc">
<nav id="TableOfContents">
<ul>
<li><a href="#处理器概述">处理器概述</a></li>
<li><a href="#核心方法process">核心方法process</a></li>
<li><a href="#单机处理器basicprocessor">单机处理器BasicProcessor</a></li>
<li><a href="#广播处理器broadcastprocessor">广播处理器BroadcastProcessor</a></li>
<li><a href="#并行处理器mapreduceprocessor">并行处理器MapReduceProcessor</a></li>
<li><a href="#最佳实践mapreduce实现静态分片">最佳实践MapReduce实现静态分片</a></li>
<li><a href="#最佳实践mapreduce多级分发处理">最佳实践MapReduce多级分发处理</a></li>
<li><a href="#更多示例">更多示例</a></li>
</ul>
</nav>
</aside>
</main>
</body>
</html>