PowerJob/others/guidance/public/en.search-data.min.0bce94d64b2262fe2893b195a0eb01809131e0e8820b4a43c2f6d7a0c71d537f.js
2020-05-21 13:11:06 +08:00

1 line
36 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'use strict';(function(){const indexCfg={cache:true};indexCfg.doc={id:'id',field:['title','content'],store:['title','href'],};const index=FlexSearch.create('balance',indexCfg);window.bookSearchIndex=index;index.add({'id':0,'href':'/docs/super/container/','title':"容器",'content':"什么是容器? 介绍 OhMyScheduler的容器技术允许开发者开发独立于Worker项目之外Java处理器简单来说就是以Maven工程项目的维度去组织一堆Java文件开发者开发的众多脚本处理器进而兼具开发效率和可维护性。\n用途举例 比如突然出现了某个数据库数据清理任务与主业务无关写进原本的项目工程中不太优雅这时候就可以单独创建一个用于数据操作的容器在里面完成处理器的开发通过OhMyScheduler的容器部署技术在Worker集群上被加载执行。 比如常见的日志清理啊机器状态上报啊对于广大Java程序员来说也许并不是很会写shell脚本此时也可以借用agent+容器技术利用Java完成各项原本需要通过脚本进行的操作。 (感觉例子举的都不是很好\u0026hellip;这个东西嘛,只可意会不可言传,大家努力理解一下吧~超好用哦~)\n生成容器模版 为了方便开发者使用,最新版本的前端页面已经支持容器工程模版的自动生成,开发者仅需要填入相关信息即可下载容器模版开始开发。 Group对应Maven的\u0026lt;groupId\u0026gt;标签,一般填入倒写的公司域名。 Artifact对于Maven的\u0026lt;artifactId\u0026gt;标签,填入代表该容器的唯一标示。 Name对应Maven的\u0026lt;name\u0026gt;标签,填入该容器名称。 Package Name包名代表了的容器工程内部所使用的包名警告包名一旦生成后请勿更改否则会导致运行时容器加载错误当然如有必须修改包名的需求可以尝试替换/resource下以oms-worker-container开头的所有文件相关的值。 Java Version容器工程的Java版本请务必与容器目标部署Worker平台的Java版本保持一致。 开发容器工程 完成容器模版创建后下载解压会得到如下结构的Java工程\noms-template-origin // 工程名称,可以自由更改 ├── pom.xml └── src ├── main │ ├── java │ │ └── cn │ │ └── edu │ │ └── zju │ │ └── tjq │ │ └── container │ │ └── samples // 所有处理器代码必须位于该目录下,其余类随意 │ └── resources // 严禁随意更改以下两个配置文件(允许添加,不允许更改现有内容) │ ├── oms-worker-container-spring-context.xml │ └── oms-worker-container.properties └── test └── java 之后便可以愉快地在packageName下编写处理器代码啦\n需要示例代码客官这边请\n创建容器 目前OhMyScheduler支持使用Git代码库和FatJar来创建容器。创建路径容器运维 -\u0026gt; 容器管理 -\u0026gt; 新建容器。\n当使用Git代码库创建容器时OhMyScheduler-Server需要完成代码库的下载、编译、构建和上传因此需要server运行环境包含可用的Git和Maven环境包括私服的访问权限。 下图为使用Git代码库创建容器的示例需要填入容器名称和代码库信息等参数\n下图为使用FatJar创建容器的示例需要上传可用的FatJarFatJar值包含了所有依赖的Jar文件\n部署容器 完成容器创建后,便可在容器管理界面查看已创建的容器,点击部署,可以看到详细的部署信息。 部署完成后,可以点击机器列表查看已部署该容器的机器信息。 "});index.add({'id':1,'href':'/docs/startup/','title':"快速开始",'content':""});index.add({'id':2,'href':'/docs/startup/1-server-startup/','title':"调度中心Server部署",'content':"环境要求 Open JDK 8+\n Apache Maven 3+\n 任意 Spring Data Jpa 支持的关系型数据库MySQL/Oracle/MS SQLServer\u0026hellip;\n MongoDB可选任意支持GridFS的mongoDB版本4.2.6测试通过,其余未经测试,仅从理论角度分析可用),缺失该组件的情况下将无法使用在线日志、容器部署等扩展功能\n 初始化关系数据库 调度服务器oh-my-scheduler-server的持久化层基于Spring Boot Jpa实现对于能够直连数据库的应用开发者仅需完成数据库的创建即运行SQLCREATE database if NOT EXISTS oms-product default character set utf8mb4 collate utf8mb4_unicode_ci;`\n OhMyScheduler支持环境隔离提供日常daily、预发pre和线上product三套环境请根据使用的环境分别部署对应的数据库oms-daily、oms-pre和oms-product。 调度服务器属于时间敏感应用强烈建议检查当前数据库所使用的时区信息show variables like \u0026quot;%time_zone%\u0026quot;;务必确保time_zone代表的时区与JDBC URL中serverTimezone字段代表的时区一致 手动建表表SQL文件下载地址 部署调度服务器—源码编译 调度服务器oh-my-scheduler-server支持任意的水平扩展即多实例集群部署仅需要在同一个局域网内启动新的服务器实例性能强劲无上限\n 调度服务器oh-my-scheduler-server为了支持环境隔离分别采用了日常application-daily.properties、预发application-pre.properties和线上application-product.properties三套配置文件请根据实际需求进行修改以下为配置文件详解。\n 配置项 含义 可选 server.port SpringBoot配置HTTP端口号默认7700 否 oms.akka.port OhMyScheduler配置Akka端口号默认10086 否 oms.alarm.bean.names OhMyScheduler报警服务Bean名称多值逗号分隔 是 spring.datasource.core.xxx 关系型数据库连接配置 否 spring.mail.xxx 邮件配置 是,未配置情况下将无法使用邮件报警功能 spring.data.mongodb.xxx MongoDB连接配置 是,未配置情况下将无法使用在线日志功能 oms.log.retention.local 本地日志保留天数,负数代表永久保留 否 oms.log.retention.remote 远程日志保留天数,负数代表永久保留 否 oms.container.retention.local 扩展的报警服务Bean多值逗号分割默认为邮件报警 否 oms.container.retention.remote 远程容器保留天数,负数代表永久保留 否 完成配置文件修改后,即可正式开始部署:\n 打包运行mvn clean package -U -Pdev -DskipTests构建调度中心Jar文件。 运行运行java -jar oms-server.jar --spring.profiles.active=product指定生效的配置文件。 验证访问http://ip:port查看是否出现OhMyScheduler的欢迎页面。 部署调度服务器—Docker 建议自己根据项目中的Dockerfile稍作修改制作自己的Docker镜像而不是直接使用官方镜像原因在于容器功能需要用到Git和Maven来编译代码库而公司内部往往都会搭建自己的私有仓库所以Git容器功能没办法正常运行官方镜像中的调度服务器不支持Git容器的部署。 Docker Hub地址\n部署流程\n 下载镜像docker pull tjqq/oms-server 创建容器并运行所有SpringBoot的启动参数都可通过-e Params=\u0026quot;\u0026quot;传入) docker run -d -e PARAMS=\u0026#34;--spring.profiles.active=product\u0026#34; -p 7700:7700 -p 10086:10086 -p 27777:27777 --name oms-server -v ~/docker/oms-server:/root/oms-server tjqq/oms-server:$version 单独部署前端页面(可选) 每一个oh-my-scheduler-server都自带了前端页面不过Tomcat为了完善的WebSocket支持现已切换到Undertow做Web服务器的性能就呵呵了看评测好像还行不过有追求的用户还是建议单独使用源码部署\n 源码克隆OhMyScheduler-Console 替换地址修改main.js中的axios.defaults.baseURL为服务器地址 npm run build -\u0026gt; nginx config 特别鸣谢:感谢某知名上市电商公司前端开发者对本项目的大力支持!\n初始化应用分组 每一个业务系统初次接入OhMyScheduler时都需要先完成应用注册。\n 应用注册,用于进行业务分组: 应用名称关键参数一般填入接入的业务应用名称即可需要保证唯一。同一个应用名称的所有worker视为一个集群被调度中心调度。 应用描述:可选参数,便于记忆,无实际用处。 用户注册,用于收集报警信息,用户注册录入个人信息后,即可通过报警配置进行通知。 "});index.add({'id':3,'href':'/docs/super/openapi/','title':"OpenAPI",'content':"OpenAPI允许开发者通过接口来完成手工的操作让系统整体变得更加灵活。开发者可以基于API便捷地扩展OhMyScheduler原有的功能。 依赖 最新依赖版本请参考Maven中央仓库推荐地址\u0026amp;备用地址。\n\u0026lt;dependency\u0026gt; \u0026lt;groupId\u0026gt;com.github.kfcfans\u0026lt;/groupId\u0026gt; \u0026lt;artifactId\u0026gt;oh-my-scheduler-client\u0026lt;/artifactId\u0026gt; \u0026lt;version\u0026gt;1.2.0\u0026lt;/version\u0026gt; \u0026lt;/dependency\u0026gt; 简单示例 // 初始化 client需要server地址和应用名称作为参数 OhMyClient ohMyClient = new OhMyClient(\u0026#34;127.0.0.1:7700\u0026#34;, \u0026#34;oms-test\u0026#34;); // 调用相关的API ohMyClient.stopInstance(1586855173043L) API列表 创建/修改任务 接口签名ResultDTO\u0026lt;Long\u0026gt; saveJob(ClientJobInfo newJobInfo)\n入参任务信息详细说明见下表也可以参考前端任务创建各参数的正确填法\n返回值ResultDTO根据success判断操作是否成功。若操作成功data字段返回任务ID\n 属性 说明 jobId 任务ID可选null代表创建任务否则填写需要修改的任务ID jobName 任务名称 jobDescription 任务描述 jobParams 任务参数Processor#process方法入参TaskContext对象的jobParams字段 timeExpressionType 时间表达式类型,枚举值 timeExpression 时间表达式填写类型由timeExpressionType决定比如CRON需要填写CRON表达式 executeType 执行类型,枚举值 processorType 处理器类型,枚举值 processorInfo 处理器参数填写类型由processorType决定如Java处理器需要填写全限定类名com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo maxInstanceNum 最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例) concurrency 单机线程并发数表示该实例执行过程中每个Worker使用的线程数量 instanceTimeLimit 任务实例运行时间限制0代表无任何限制超时会被打断并判定为执行失败 instanceRetryNum 任务实例重试次数,整个任务失败时重试,代价大,不推荐使用 taskRetryNum Task重试次数每个子Task失败后单独重试代价小推荐使用 minCpuCores 最小可用CPU核心数CPU可用核心数小于该值的Worker将不会执行该任务0代表无任何限制 minMemorySpace 最小内存大小GB可用内存小于该值的Worker将不会执行该任务0代表无任何限制 minDiskSpace 最小磁盘大小GB可用磁盘空间小于该值的Worker将不会执行该任务0代表无任何限制 designatedWorkers 指定机器执行设置该参数后只有列表中的机器允许执行该任务0代表无任何限制 maxWorkerCount 最大执行机器数量,限定调动执行的机器数量,空代表无限制 notifyUserIds 接收报警的用户ID列表 enable 是否启用该任务,未启用的任务不会被调度 查找任务 接口签名ResultDTO\u0026lt;JobInfoDTO\u0026gt; fetchJob(Long jobId)\n入参任务ID\n返回值根据success判断操作是否成功若请求成功则返回任务的详细信息\n禁用某个任务 接口签名ResultDTO\u0026lt;Void\u0026gt; disableJob(Long jobId)\n入参任务ID\n返回值根据success判断操作是否成功\n启用某个任务 接口签名ResultDTO\u0026lt;Void\u0026gt; enableJob(Long jobId)\n入参任务ID\n返回值根据success判断操作是否成功\n删除某个任务 接口签名ResultDTO\u0026lt;Void\u0026gt; deleteJob(Long jobId)\n入参任务ID\n返回值根据success判断操作是否成功\n立即运行某个任务 接口签名ResultDTO\u0026lt;Long\u0026gt; runJob(Long jobId, String instanceParams)\n入参任务ID + 任务实例参数Processor#process方法入参TaskContext对象的instanceParams字段\n返回值根据success判断操作是否成功操作成功返回对应的任务实例ID(instanceId)\n停止某个任务实例 接口签名ResultDTO\u0026lt;Void\u0026gt; stopInstance(Long instanceId)\n入参任务实例ID\n返回值根据success判断操作是否成功\n查询某个任务实例 接口签名ResultDTO\u0026lt;InstanceInfoDTO\u0026gt; fetchInstanceInfo(Long instanceId)\n入参任务实例ID\n返回值根据success判断操作是否成功操作成功返回任务实例的详细信息\n查询某个任务实例的状态 接口签名ResultDTO\u0026lt;Integer\u0026gt; fetchInstanceStatus(Long instanceId)\n入参任务实例ID\n返回值根据success判断操作是否成功操作成功返回任务实例的状态码对应的枚举为InstanceStatus\n"});index.add({'id':4,'href':'/docs/startup/2-worker-startup/','title':"执行器Worker初始化",'content':"基于宿主应用的执行器初始化 宿主应用即原有的业务应用,假如需要调度执行的任务与当前业务有较为紧密的联系,建议采取该方式。\n 首先添加相关的jar包依赖最新依赖版本请参考maven中央仓库推荐地址\u0026amp;备用地址\n\u0026lt;dependency\u0026gt; \u0026lt;groupId\u0026gt;com.github.kfcfans\u0026lt;/groupId\u0026gt; \u0026lt;artifactId\u0026gt;oh-my-scheduler-worker\u0026lt;/artifactId\u0026gt; \u0026lt;version\u0026gt;1.2.0\u0026lt;/version\u0026gt; \u0026lt;/dependency\u0026gt; 其次填写执行器客户端配置文件OhMyConfig各参数说明如下表所示\n 属性名称 含义 默认值 appName 宿主应用名称,需要提前在控制台完成注册 无,必填项,否则启动报错 port Worker工作端口 27777 serverAddress 调度中心oh-my-scheduler-server地址列表 无,必填项,否则启动报错 storeStrategy 本地存储策略,枚举值磁盘/内存大型MapReduce等会产生大量Task的任务推荐使用磁盘降低内存压力否则建议使用内存加速计算 StoreStrategy.DISK磁盘 maxResultLength 每个Task返回结果的默认长度超长将被截断。过长可能导致网络拥塞 8096 enableTestMode 是否启用测试模式启用后无需Server也能顺利启动OhMyScheduler-Worker用于处理器本地的单元测试 false 最后,初始化客户端,完成执行器的启动,代码示例如下:\n@Configuration public class OhMySchedulerConfig { @Bean public OhMyWorker initOMS() throws Exception { // 服务器HTTP地址端口号为 server.port而不是 ActorSystem port List\u0026lt;String\u0026gt; serverAddress = Lists.newArrayList(\u0026#34;127.0.0.1:7700\u0026#34;, \u0026#34;127.0.0.1:7701\u0026#34;); // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); config.setPort(27777); config.setAppName(\u0026#34;oms-test\u0026#34;); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 // 为了本地模拟多个实例,只能使用 MEMORY 启动(文件只能由一个应用占有) config.setStoreStrategy(StoreStrategy.MEMORY); // 2. 创建 Worker 对象,设置配置文件 OhMyWorker ohMyWorker = new OhMyWorker(); ohMyWorker.setConfig(config); return ohMyWorker; } } 非Spring应用程序在创建OhMyWorker对象后手动调用ohMyWorker.init()方法完成初始化即可。\n OhMyScheduler日志单独配置\n目前OhMyScheduler-Worker并没有实现自己的LogFactory如果有需求的话请提ISSUE可以考虑实现原因如下\n OhMyScheduler-Worker的日志基于Slf4J输出即采用了基于门面设计模式的日志框架宿主应用无论如何都可以搭起Slf4J与实际的日志框架这座桥梁。 减轻了部分开发工作量不再需要实现自己的LogFactory虽然不怎么难就是了\u0026hellip;)。 为此为了顺利且友好地输出日志请在日志配置文件logback.xml/log4j2.xml/\u0026hellip;中为OhMyScheduler-Worker单独进行日志配置比如logback示例\n\u0026lt;appender name=\u0026#34;OMS_WORKER_APPENDER\u0026#34; class=\u0026#34;ch.qos.logback.core.rolling.RollingFileAppender\u0026#34;\u0026gt; \u0026lt;file\u0026gt;${LOG_PATH}/oms-worker.log\u0026lt;/file\u0026gt; \u0026lt;rollingPolicy class=\u0026#34;ch.qos.logback.core.rolling.TimeBasedRollingPolicy\u0026#34;\u0026gt; \u0026lt;FileNamePattern\u0026gt;${LOG_PATH}/oms-worker.%d{yyyy-MM-dd}.log\u0026lt;/FileNamePattern\u0026gt; \u0026lt;MaxHistory\u0026gt;7\u0026lt;/MaxHistory\u0026gt; \u0026lt;/rollingPolicy\u0026gt; \u0026lt;encoder class=\u0026#34;ch.qos.logback.classic.encoder.PatternLayoutEncoder\u0026#34;\u0026gt; \u0026lt;pattern\u0026gt;%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n\u0026lt;/pattern\u0026gt; \u0026lt;charset\u0026gt;UTF-8\u0026lt;/charset\u0026gt; \u0026lt;/encoder\u0026gt; \u0026lt;append\u0026gt;true\u0026lt;/append\u0026gt; \u0026lt;/appender\u0026gt; \u0026lt;logger name=\u0026#34;com.github.kfcfans.oms.worker\u0026#34; level=\u0026#34;INFO\u0026#34; additivity=\u0026#34;false\u0026#34;\u0026gt; \u0026lt;appender-ref ref=\u0026#34;OMS_WORKER_APPENDER\u0026#34; /\u0026gt; \u0026lt;/logger\u0026gt; 无论如何OhMyScheduler-Worker启动时都会打印Banner如下所示您可以通过Banner来判断日志配置是否成功emmm\u0026hellip;Markdown显示似乎有点丑实际上超帅的呢\n███████ ██ ████ ████ ████████ ██ ██ ██ ██░░░░░██ ░██ ░██░██ ██░██ ██ ██ ██░░░░░░ ░██ ░██ ░██ ██ ░░██░██ ░██░░██ ██ ░██ ░░██ ██ ░██ █████ ░██ █████ ░██ ██ ██ ░██ █████ ██████ ░██ ░██░██████ ░██ ░░███ ░██ ░░███ ░█████████ ██░░░██░██████ ██░░░██ ██████░██ ░██ ░██ ██░░░██░░██░░█ ░██ ░██░██░░░██░██ ░░█ ░██ ░██ ░░░░░░░░██░██ ░░ ░██░░░██░███████ ██░░░██░██ ░██ ░██░███████ ░██ ░ ░░██ ██ ░██ ░██░██ ░ ░██ ██ ░██░██ ██░██ ░██░██░░░░ ░██ ░██░██ ░██ ░██░██░░░░ ░██ ░░███████ ░██ ░██░██ ░██ ██ ████████ ░░█████ ░██ ░██░░██████░░██████░░██████ ███░░██████░███ ░░░░░░░ ░░ ░░ ░░ ░░ ░░ ░░░░░░░░ ░░░░░ ░░ ░░ ░░░░░░ ░░░░░░ ░░░░░░ ░░░ ░░░░░░ ░░░ 基于agent的执行器初始化 agent是一个没有任何业务逻辑的执行器其实就是为worker加了一个main方法。\n 代码编译方式启动示例java -jar oh-my-scheduler-worker-agent-1.2.0.jar -a my-agent\nUsage: OhMyAgent [-hV] -a=\u0026lt;appName\u0026gt; [-e=\u0026lt;storeStrategy\u0026gt;] [-l=\u0026lt;length\u0026gt;] [-p=\u0026lt;port\u0026gt;] [-s=\u0026lt;server\u0026gt;] OhMyScheduler-Worker代理 -a, --app=\u0026lt;appName\u0026gt; worker-agent名称可通过调度中心控制台创建 -e, --persistence=\u0026lt;storeStrategy\u0026gt; 存储策略枚举值DISK 或 MEMORY -h, --help Show this help message and exit. -l, --length=\u0026lt;length\u0026gt; 返回值最大长度 -p, --port=\u0026lt;port\u0026gt; worker-agent的ActorSystem监听端口不建议更改 -s, --server=\u0026lt;server\u0026gt; 调度中心地址,多值英文逗号分隔,格式 IP:Port OR domain -V, --version Print version information and exit. Docker镜像Docker Hub\n"});index.add({'id':5,'href':'/docs/super/','title':"高级特性",'content':""});index.add({'id':6,'href':'/docs/startup/3-processor-develop/','title':"处理器开发",'content':"处理器概述 OhMyScheduler当前支持Shell、Python等脚本处理器和Java处理器。脚本处理器只需要开发者完成脚本的编写xxx.sh / xxx.py在控制台填入脚本内容即可本章不再赘述。本章将重点阐述Java处理器开发方法与使用技巧。\n Java处理器可根据代码所处位置划分为内置Java处理器和容器Java处理器前者直接集成在宿主应用也就是接入本系统的业务应用一般用来处理业务需求后者可以在一个独立的轻量级的Java工程中开发通过容器技术详见容器章节被worker集群热加载提供Java的“脚本能力”一般用于处理灵活多变的需求。 Java处理器可根据对象创建者划分为SpringBean处理器和普通Java对象处理器前者由Spring IOC容器完成处理器的创建和初始化后者则有OhMyScheduler维护其状态。如果宿主应用支持Spring强烈建议使用SpringBean处理器开发者仅需要将Processor注册进Spring IOC容器一个@Component注解或一句bean配置。 Java处理器可根据功能划分为单机处理器、广播处理器、Map处理器和MapReduce处理器。 单机处理器BasicProcessor对应了单机任务即某个任务的某次运行只会有某一台机器的某一个线程参与运算。 广播处理器BroadcastProcessor对应了广播任务即某个任务的某次运行会调动集群内所有机器参与运算。 Map处理器MapProcessor对应了Map任务即某个任务在运行过程中允许产生子任务并分发到其他机器进行运算。 MapReduce处理器MapReduceProcessor对应了MapReduce任务在Map任务的基础上增加了所有任务结束后的汇总统计。 核心方法process 任意Java处理器都需要实现处理的核心方法其接口签名如下\nProcessResult process(TaskContext context) throws Exception; 方法入参TaskContext包含了本次处理的上下文信息具体属性如下\n 属性名称 意义/用法 jobId 任务ID开发者一般无需关心此参数 instanceId 任务实例ID全局唯一开发者一般无需关心此参数 subInstanceId 子任务实例ID秒级任务使用开发者一般无需关心此参数 taskId 采用链式命名法的ID在某个任务实例内唯一开发者一般无需关心此参数 taskName task名称Map/MapReduce任务的子任务的值为开发者指定否则为系统默认值开发者一般无需关心此参数 jobParams 任务参数,其值等同于控制台录入的任务参数,常用! instanceParams 任务实例参数其值等同于使用OpenAPI运行任务实例时传递的参数常用 maxRetryTimes Task的最大重试次数 currentRetryTimes Task的当前重试次数和maxRetryTimes联合起来可以判断当前是否为该Task的最后一次运行机会 subTask 子TaskMap/MapReduce处理器专属开发者调用map方法时传递的子任务列表中的某一个 omsLogger 在线日志用法同Slf4J记录的日志可以直接通过控制台查看非常便捷和强大不过使用过程中需要注意频率可能对Server造成巨大的压力 方法的返回值为ProcessResult代表了本次Task执行的结果包含success和msg两个属性分别用于传递Task是否执行成功和Task需要返回的信息。\n单机处理器BasicProcessor 单机执行的策略下server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口BasicProcessor代码示例如下\n// 支持 SpringBean 的形式 @Component public class BasicProcessorDemo implements BasicProcessor { @Resource private MysteryService mysteryService; @Override public ProcessResult process(TaskContext context) throws Exception { // 在线日志功能,可以直接在控制台查看任务日志,非常便捷 OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info(\u0026#34;BasicProcessorDemo start to process, current JobParams is {}.\u0026#34;, context.getJobParams()); // TaskContext为任务的上下文信息包含了在控制台录入的任务元数据常用字段为 // jobParams任务参数在控制台录入instanceParams任务实例参数通过 OpenAPI 触发的任务实例才可能存在该参数) // 进行实际处理... mysteryService.hasaki(); // 返回结果,该结果会被持久化到数据库,在前端页面直接查看,极为方便 return new ProcessResult(true, \u0026#34;result is xxx\u0026#34;); } } 广播处理器BroadcastProcessor 广播执行的策略下所有机器都会被调度执行该任务。为了便于资源的准备和释放广播处理器在BasicProcessor的基础上额外增加了preProcess和postProcess方法分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:\n@Component public class BroadcastProcessorDemo extends BroadcastProcessor { @Override public ProcessResult preProcess(TaskContext taskContext) throws Exception { // 预执行,会在所有 worker 执行 process 方法前调用 return new ProcessResult(true, \u0026#34;init success\u0026#34;); } @Override public ProcessResult process(TaskContext context) throws Exception { // 撰写整个worker集群都会执行的代码逻辑 return new ProcessResult(true, \u0026#34;release resource success\u0026#34;); } @Override public ProcessResult postProcess(TaskContext taskContext, List\u0026lt;TaskResult\u0026gt; taskResults) throws Exception { // taskResults 存储了所有worker执行的结果包括preProcess // 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果 return new ProcessResult(true, \u0026#34;process success\u0026#34;); } } 并行处理器MapReduceProcessor MapReduce是最复杂也是最强大的一种执行器它允许开发者完成任务的拆分将子任务派发到集群中其他Worker执行是执行大批量处理任务的不二之选实现MapReduce处理器需要继承MapReduceProcessor类具体用法如下示例代码所示\n@Component public class MapReduceProcessorDemo extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { // 判断是否为根任务 if (isRootTask()) { // 构造子任务 List\u0026lt;SubTask\u0026gt; subTaskList = Lists.newLinkedList(); /* * 子任务的构造由开发者自己定义 * eg. 现在需要从文件中读取100W个ID并处理数据库中这些ID对应的数据那么步骤如下 * 1. 根任务RootTask读取文件流式拉取100W个ID并按1000个一批的大小组装成子任务进行派发 * 2. 非根任务获取子任务,完成业务逻辑的处理 */ // 调用 map 方法,派发子任务 return map(subTaskList, \u0026#34;DATA_PROCESS_TASK\u0026#34;); } // 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支 if (context.getSubTask() instanceof SubTask) { // 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器 return new ProcessResult(true, \u0026#34;PROCESS_SUB_TASK_SUCCESS\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN_BUG\u0026#34;); } @Override public ProcessResult reduce(TaskContext taskContext, List\u0026lt;TaskResult\u0026gt; taskResults) { // 所有 Task 执行结束后reduce 将会被执行 // taskResults 保存了所有子任务的执行结果 // 用法举例,统计执行结果 AtomicLong successCnt = new AtomicLong(0); taskResults.forEach(tr -\u0026gt; { if (tr.isSuccess()) { successCnt.incrementAndGet(); } }); // 该结果将作为任务最终的执行结果 return new ProcessResult(true, \u0026#34;success task num:\u0026#34; + successCnt.get()); } // 自定义的子任务 private static class SubTask { private Long siteId; private List\u0026lt;Long\u0026gt; idList; } } 注Map处理器相当于MapReduce处理器的阉割版本阉割了reduce方法此处不再单独举例。\n最佳实践MapReduce实现静态分片 虽然说这有点傻鸡焉用牛刀的感觉,不过既然目前市场上同类产品都处于静态分片的阶段,我也就在这里给大家举个例子吧~\n@Component public class StaticSliceProcessor extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); // root task 负责分发任务 if (isRootTask()) { // 从控制台传递分片参数架设格式为KV1=a\u0026amp;2=b\u0026amp;3=c String jobParams = context.getJobParams(); Map\u0026lt;String, String\u0026gt; paramsMap = Splitter.on(\u0026#34;\u0026amp;\u0026#34;).withKeyValueSeparator(\u0026#34;=\u0026#34;).split(jobParams); List\u0026lt;SubTask\u0026gt; subTasks = Lists.newLinkedList(); paramsMap.forEach((k, v) -\u0026gt; subTasks.add(new SubTask(Integer.parseInt(k), v))); return map(subTasks, \u0026#34;SLICE_TASK\u0026#34;); } Object subTask = context.getSubTask(); if (subTask instanceof SubTask) { // 实际处理 // 当然,如果觉得 subTask 还是很大,也可以继续分发哦 return new ProcessResult(true, \u0026#34;subTask:\u0026#34; + ((SubTask) subTask).getIndex() + \u0026#34; process successfully\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN BUG\u0026#34;); } @Override public ProcessResult reduce(TaskContext context, List\u0026lt;TaskResult\u0026gt; taskResults) { // 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可 return new ProcessResult(true, \u0026#34;xxxx\u0026#34;); } @Getter @NoArgsConstructor @AllArgsConstructor private static class SubTask { private int index; private String params; } } 最佳实践MapReduce多级分发处理 利用MapReduce实现 Root -\u0026gt; A -\u0026gt; B/C -\u0026gt; Reduce的DAG 工作流。\n@Component public class DAGSimulationProcessor extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { if (isRootTask()) { // L1. 执行根任务 // 执行完毕后产生子任务 A需要传递的参数可以作为 TaskA 的属性进行传递 TaskA taskA = new TaskA(); return map(Lists.newArrayList(taskA), \u0026#34;LEVEL1_TASK_A\u0026#34;); } if (context.getSubTask() instanceof TaskA) { // L2. 执行A任务 // 执行完成后产生子任务 BC并行执行 TaskB taskB = new TaskB(); TaskC taskC = new TaskC(); return map(Lists.newArrayList(taskB, taskC), \u0026#34;LEVEL2_TASK_BC\u0026#34;); } if (context.getSubTask() instanceof TaskB) { // L3. 执行B任务 return new ProcessResult(true, \u0026#34;xxx\u0026#34;); } if (context.getSubTask() instanceof TaskC) { // L3. 执行C任务 return new ProcessResult(true, \u0026#34;xxx\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN_TYPE_OF_SUB_TASK\u0026#34;); } @Override public ProcessResult reduce(TaskContext context, List\u0026lt;TaskResult\u0026gt; taskResults) { // L4. 执行最终 Reduce 任务taskResults保存了之前所有任务的结果 taskResults.forEach(taskResult -\u0026gt; { // do something... }); return new ProcessResult(true, \u0026#34;reduce success\u0026#34;); } private static class TaskA { } private static class TaskB { } private static class TaskC { } } 更多示例 没看够更多示例请见oh-my-scheduler-worker-samples\n"});index.add({'id':7,'href':'/docs/startup/4-console-guide/','title':"任务管理与在线运维",'content':"前端控制台允许开发者可视化地进行任务增、删、改、查等管理操作,同时也能直观地看到任务的运行数据,包括运行状态、详情和在线日志等。以下为对控制台的详细介绍: 主页 展示了系统整体的概览和集群Worker列表。\n任务创建 创建需要被调度执行的任务,入口为主页 -\u0026gt; 任务管理 -\u0026gt; 新建任务。\n 任务名称:名称,便于记忆与搜索,无特殊用途,请尽量简短(占用数据库字段空间)\n 任务描述:描述,无特殊作用,请尽量简短(占用数据库字段空间)\n 任务参数任务处理时能够获取到的参数即各个Processor的process方法入参TaskContext对象的jobParams字段进行一次处理器开发就能理解了\n 定时信息:由下拉框和输入框组成\n API -\u0026gt; 不需要填写任何参数(填了也不起作用) CRON -\u0026gt; 填写 CRON 表达式(可以找个在线生成网站生成) 固定频率 -\u0026gt; 填写整数,单位毫秒 固定延迟 -\u0026gt; 填写整数,单位毫秒 执行配置由执行类型单机、广播和MapReduce、处理器类型和处理器参数组成后两项相互关联。\n 内置Java处理器 -\u0026gt; 填写该处理器的全限定类名eg, com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo Java容器 -\u0026gt; 填写容器ID#处理器全限定类名eg1#com.github.kfcfans.oms.container.DemoProcessor SHELL -\u0026gt; 填写需要处理的脚本直接复制文件内容或脚本下载连接http://xxx PYTHON -\u0026gt; 填写完整的python脚本或下载连接http://xxx 运行配置\n 最大实例数:该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例) 单机线程并发数该实例执行过程中每个Worker使用的线程数量MapReduce任务生效其余无论填什么都只会使用1个线程或3个线程\u0026hellip; 运行时间限制限定任务的最大运行时间超时则视为失败单位毫秒0代表不限制超时时间。 重试配置:\n 任务重试次数实例级别失败了整个任务实例重试会更换TaskTracker本次任务实例的Master节点代价较大大型Map/MapReduce慎用。 子任务重试次数Task级别每个子Task失败后单独重试会更换ProcessorTracker本次任务实际执行的Worker节点代价较小推荐使用。 注对于单机任务来说假如任务重试次数和子任务重试次数都配置了1且都执行失败实际执行次数会变成4次推荐任务实例重试配置为0子任务重试次数根据实际情况配置。 机器配置用来标明允许执行任务的机器状态避开那些摇摇欲坠的机器0代表无任何限制。\n 最低CPU核心数填写浮点数CPU可用核心数小于该值的Worker将不会执行该任务。 最低内存GB填写浮点数可用内存小于该值的Worker将不会执行该任务。 最低磁盘GB填写浮点数可用磁盘空间小于该值的Worker将不会执行该任务。 集群配置\n 执行机器地址指定集群中的某几台机器执行任务debug的好帮手多值英文逗号分割如192.168.1.1:27777,192.168.1.2:27777 最大执行机器数量:限定调动执行的机器数量 报警配置:选择任务执行失败后报警通知的对象,需要事先录入。\n 任务管理 直观地展示当前系统所管理的所有任务信息,并提供相应的运维方法。\n运行状态 直观地展示当前系统中运行任务实例的状态点击详情即可获取详细的信息点击日志可以查看通过omsLogger上报的日志点击停止则可以强制终止该任务。\n在线日志 在线查看Worker执行过程中上报的日志极大降低debug成本提升开发效率\n"});index.add({'id':8,'href':'/docs/version/','title':"更新日志",'content':""});index.add({'id':9,'href':'/docs/','title':"Docs",'content':""});})();