mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
1 line
36 KiB
JavaScript
1 line
36 KiB
JavaScript
'use strict';(function(){const indexCfg={cache:true};indexCfg.doc={id:'id',field:['title','content'],store:['title','href'],};const index=FlexSearch.create('balance',indexCfg);window.bookSearchIndex=index;index.add({'id':0,'href':'/ohmyscheduler/docs/super/container/','title':"容器",'content':"什么是容器? 介绍 OhMyScheduler的容器技术允许开发者开发独立于Worker项目之外Java处理器,简单来说,就是以Maven工程项目的维度去组织一堆Java文件(开发者开发的众多脚本处理器),进而兼具开发效率和可维护性。\n用途举例 比如,突然出现了某个数据库数据清理任务,与主业务无关,写进原本的项目工程中不太优雅,这时候就可以单独创建一个用于数据操作的容器,在里面完成处理器的开发,通过OhMyScheduler的容器部署技术在Worker集群上被加载执行。 比如,常见的日志清理啊,机器状态上报啊,对于广大Java程序员来说,也许并不是很会写shell脚本,此时也可以借用agent+容器技术,利用Java完成各项原本需要通过脚本进行的操作。 (感觉例子举的都不是很好\u0026hellip;这个东西嘛,只可意会不可言传,大家努力理解一下吧~超好用哦~)\n生成容器模版 为了方便开发者使用,最新版本的前端页面已经支持容器工程模版的自动生成,开发者仅需要填入相关信息即可下载容器模版开始开发。 Group:对应Maven的\u0026lt;groupId\u0026gt;标签,一般填入倒写的公司域名。 Artifact:对于Maven的\u0026lt;artifactId\u0026gt;标签,填入代表该容器的唯一标示。 Name:对应Maven的\u0026lt;name\u0026gt;标签,填入该容器名称。 Package Name:包名,代表了的容器工程内部所使用的包名,警告:包名一旦生成后,请勿更改!否则会导致运行时容器加载错误(当然,如有必须修改包名的需求,可以尝试替换/resource下以oms-worker-container开头的所有文件相关的值)。 Java Version:容器工程的Java版本,请务必与容器目标部署Worker平台的Java版本保持一致。 开发容器工程 完成容器模版创建后,下载,解压,会得到如下结构的Java工程:\noms-template-origin // 工程名称,可以自由更改 ├── pom.xml └── src ├── main │ ├── java │ │ └── cn │ │ └── edu │ │ └── zju │ │ └── tjq │ │ └── container │ │ └── samples // 所有处理器代码必须位于该目录下,其余类随意 │ └── resources // 严禁随意更改以下两个配置文件(允许添加,不允许更改现有内容) │ ├── oms-worker-container-spring-context.xml │ └── oms-worker-container.properties └── test └── java 之后便可以愉快地在packageName下编写处理器代码啦~\n需要示例代码?客官这边请~\n创建容器 目前,OhMyScheduler支持使用Git代码库和FatJar来创建容器。创建路径:容器运维 -\u0026gt; 容器管理 -\u0026gt; 新建容器。\n当使用Git代码库创建容器时,OhMyScheduler-Server需要完成代码库的下载、编译、构建和上传,因此需要server运行环境包含可用的Git和Maven环境(包括私服的访问权限)。 下图为使用Git代码库创建容器的示例,需要填入容器名称和代码库信息等参数:\n下图为使用FatJar创建容器的示例,需要上传可用的FatJar(注:FatJar值包含了所有依赖的Jar文件):\n部署容器 完成容器创建后,便可在容器管理界面查看已创建的容器,点击部署,可以看到详细的部署信息。 部署完成后,可以点击机器列表查看已部署该容器的机器信息。 "});index.add({'id':1,'href':'/ohmyscheduler/docs/startup/','title':"快速开始",'content':""});index.add({'id':2,'href':'/ohmyscheduler/docs/startup/1-server-startup/','title':"调度中心(Server)部署",'content':"环境要求 Open JDK 8+\n Apache Maven 3+\n 任意 Spring Data Jpa 支持的关系型数据库(MySQL/Oracle/MS SQLServer\u0026hellip;)\n MongoDB(可选),任意支持GridFS的mongoDB版本(4.2.6测试通过,其余未经测试,仅从理论角度分析可用),缺失该组件的情况下将无法使用在线日志、容器部署等扩展功能\n 初始化关系数据库 调度服务器(oh-my-scheduler-server)的持久化层基于Spring Boot Jpa实现,对于能够直连数据库的应用,开发者仅需完成数据库的创建,即运行SQL:CREATE database if NOT EXISTS oms-product default character set utf8mb4 collate utf8mb4_unicode_ci;`\n OhMyScheduler支持环境隔离,提供日常(daily)、预发(pre)和线上(product)三套环境,请根据使用的环境分别部署对应的数据库:oms-daily、oms-pre和oms-product。 调度服务器属于时间敏感应用,强烈建议检查当前数据库所使用的时区信息(show variables like \u0026quot;%time_zone%\u0026quot;;),务必确保time_zone代表的时区与JDBC URL中serverTimezone字段代表的时区一致! 手动建表表SQL文件:下载地址 部署调度服务器—源码编译 调度服务器(oh-my-scheduler-server)支持任意的水平扩展,即多实例集群部署仅需要在同一个局域网内启动新的服务器实例,性能强劲无上限!\n 调度服务器(oh-my-scheduler-server)为了支持环境隔离,分别采用了日常(application-daily.properties)、预发(application-pre.properties)和线上(application-product.properties)三套配置文件,请根据实际需求进行修改,以下为配置文件详解。\n 配置项 含义 可选 server.port SpringBoot配置,HTTP端口号,默认7700 否 oms.akka.port OhMyScheduler配置,Akka端口号,默认10086 否 oms.alarm.bean.names OhMyScheduler报警服务Bean名称,多值逗号分隔 是 spring.datasource.core.xxx 关系型数据库连接配置 否 spring.mail.xxx 邮件配置 是,未配置情况下将无法使用邮件报警功能 spring.data.mongodb.xxx MongoDB连接配置 是,未配置情况下将无法使用在线日志功能 oms.log.retention.local 本地日志保留天数,负数代表永久保留 否 oms.log.retention.remote 远程日志保留天数,负数代表永久保留 否 oms.container.retention.local 扩展的报警服务Bean,多值逗号分割,默认为邮件报警 否 oms.container.retention.remote 远程容器保留天数,负数代表永久保留 否 完成配置文件修改后,即可正式开始部署:\n 打包:运行mvn clean package -U -Pdev -DskipTests,构建调度中心Jar文件。 运行:运行java -jar oms-server.jar --spring.profiles.active=product,指定生效的配置文件。 验证:访问http://ip:port查看是否出现OhMyScheduler的欢迎页面。 部署调度服务器—Docker 建议自己根据项目中的Dockerfile稍作修改,制作自己的Docker镜像,而不是直接使用官方镜像!原因在于:容器功能需要用到Git和Maven来编译代码库,而公司内部往往都会搭建自己的私有仓库,所以Git容器功能没办法正常运行(即,官方镜像中的调度服务器不支持Git容器的部署)。 Docker Hub地址\n部署流程:\n 下载镜像:docker pull tjqq/oms-server 创建容器并运行(所有SpringBoot的启动参数都可通过-e Params=\u0026quot;\u0026quot;传入) docker run -d -e PARAMS=\u0026#34;--spring.profiles.active=product\u0026#34; -p 7700:7700 -p 10086:10086 -p 27777:27777 --name oms-server -v ~/docker/oms-server:/root/oms-server tjqq/oms-server:$version 单独部署前端页面(可选) 每一个oh-my-scheduler-server都自带了前端页面,不过Tomcat(为了完善的WebSocket支持,现已切换到Undertow)做Web服务器的性能就呵呵了(看评测好像还行,不过有追求的用户还是建议单独使用源码部署)~\n 源码克隆:OhMyScheduler-Console 替换地址:修改main.js中的axios.defaults.baseURL为服务器地址 npm run build -\u0026gt; nginx config 特别鸣谢:感谢某知名上市电商公司前端开发者对本项目的大力支持!\n初始化应用分组 每一个业务系统初次接入OhMyScheduler时,都需要先完成应用注册。\n 应用注册,用于进行业务分组: 应用名称:关键参数,一般填入接入的业务应用名称即可,需要保证唯一。同一个应用名称的所有worker视为一个集群被调度中心调度。 应用描述:可选参数,便于记忆,无实际用处。 用户注册,用于收集报警信息,用户注册录入个人信息后,即可通过报警配置进行通知。 "});index.add({'id':3,'href':'/ohmyscheduler/docs/super/openapi/','title':"OpenAPI",'content':"OpenAPI允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于API便捷地扩展OhMyScheduler原有的功能。 依赖 最新依赖版本请参考Maven中央仓库:推荐地址\u0026amp;备用地址。\n\u0026lt;dependency\u0026gt; \u0026lt;groupId\u0026gt;com.github.kfcfans\u0026lt;/groupId\u0026gt; \u0026lt;artifactId\u0026gt;oh-my-scheduler-client\u0026lt;/artifactId\u0026gt; \u0026lt;version\u0026gt;1.2.0\u0026lt;/version\u0026gt; \u0026lt;/dependency\u0026gt; 简单示例 // 初始化 client,需要server地址和应用名称作为参数 OhMyClient ohMyClient = new OhMyClient(\u0026#34;127.0.0.1:7700\u0026#34;, \u0026#34;oms-test\u0026#34;); // 调用相关的API ohMyClient.stopInstance(1586855173043L) API列表 创建/修改任务 接口签名:ResultDTO\u0026lt;Long\u0026gt; saveJob(ClientJobInfo newJobInfo)\n入参:任务信息(详细说明见下表,也可以参考前端任务创建各参数的正确填法)\n返回值:ResultDTO,根据success判断操作是否成功。若操作成功,data字段返回任务ID\n 属性 说明 jobId 任务ID,可选,null代表创建任务,否则填写需要修改的任务ID jobName 任务名称 jobDescription 任务描述 jobParams 任务参数,Processor#process方法入参TaskContext对象的jobParams字段 timeExpressionType 时间表达式类型,枚举值 timeExpression 时间表达式,填写类型由timeExpressionType决定,比如CRON需要填写CRON表达式 executeType 执行类型,枚举值 processorType 处理器类型,枚举值 processorInfo 处理器参数,填写类型由processorType决定,如Java处理器需要填写全限定类名,如:com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo maxInstanceNum 最大实例数,该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例) concurrency 单机线程并发数,表示该实例执行过程中每个Worker使用的线程数量 instanceTimeLimit 任务实例运行时间限制,0代表无任何限制,超时会被打断并判定为执行失败 instanceRetryNum 任务实例重试次数,整个任务失败时重试,代价大,不推荐使用 taskRetryNum Task重试次数,每个子Task失败后单独重试,代价小,推荐使用 minCpuCores 最小可用CPU核心数,CPU可用核心数小于该值的Worker将不会执行该任务,0代表无任何限制 minMemorySpace 最小内存大小(GB),可用内存小于该值的Worker将不会执行该任务,0代表无任何限制 minDiskSpace 最小磁盘大小(GB),可用磁盘空间小于该值的Worker将不会执行该任务,0代表无任何限制 designatedWorkers 指定机器执行,设置该参数后只有列表中的机器允许执行该任务,0代表无任何限制 maxWorkerCount 最大执行机器数量,限定调动执行的机器数量,空代表无限制 notifyUserIds 接收报警的用户ID列表 enable 是否启用该任务,未启用的任务不会被调度 查找任务 接口签名:ResultDTO\u0026lt;JobInfoDTO\u0026gt; fetchJob(Long jobId)\n入参:任务ID\n返回值:根据success判断操作是否成功,若请求成功则返回任务的详细信息\n禁用某个任务 接口签名:ResultDTO\u0026lt;Void\u0026gt; disableJob(Long jobId)\n入参:任务ID\n返回值:根据success判断操作是否成功\n启用某个任务 接口签名:ResultDTO\u0026lt;Void\u0026gt; enableJob(Long jobId)\n入参:任务ID\n返回值:根据success判断操作是否成功\n删除某个任务 接口签名:ResultDTO\u0026lt;Void\u0026gt; deleteJob(Long jobId)\n入参:任务ID\n返回值:根据success判断操作是否成功\n立即运行某个任务 接口签名:ResultDTO\u0026lt;Long\u0026gt; runJob(Long jobId, String instanceParams)\n入参:任务ID + 任务实例参数(Processor#process方法入参TaskContext对象的instanceParams字段)\n返回值:根据success判断操作是否成功,操作成功返回对应的任务实例ID(instanceId)\n停止某个任务实例 接口签名:ResultDTO\u0026lt;Void\u0026gt; stopInstance(Long instanceId)\n入参:任务实例ID\n返回值:根据success判断操作是否成功\n查询某个任务实例 接口签名:ResultDTO\u0026lt;InstanceInfoDTO\u0026gt; fetchInstanceInfo(Long instanceId)\n入参:任务实例ID\n返回值:根据success判断操作是否成功,操作成功返回任务实例的详细信息\n查询某个任务实例的状态 接口签名:ResultDTO\u0026lt;Integer\u0026gt; fetchInstanceStatus(Long instanceId)\n入参:任务实例ID\n返回值:根据success判断操作是否成功,操作成功返回任务实例的状态码,对应的枚举为:InstanceStatus\n"});index.add({'id':4,'href':'/ohmyscheduler/docs/startup/2-worker-startup/','title':"执行器(Worker)初始化",'content':"基于宿主应用的执行器初始化 宿主应用即原有的业务应用,假如需要调度执行的任务与当前业务有较为紧密的联系,建议采取该方式。\n 首先,添加相关的jar包依赖,最新依赖版本请参考maven中央仓库:推荐地址\u0026amp;备用地址\n\u0026lt;dependency\u0026gt; \u0026lt;groupId\u0026gt;com.github.kfcfans\u0026lt;/groupId\u0026gt; \u0026lt;artifactId\u0026gt;oh-my-scheduler-worker\u0026lt;/artifactId\u0026gt; \u0026lt;version\u0026gt;1.2.0\u0026lt;/version\u0026gt; \u0026lt;/dependency\u0026gt; 其次,填写执行器客户端配置文件OhMyConfig,各参数说明如下表所示:\n 属性名称 含义 默认值 appName 宿主应用名称,需要提前在控制台完成注册 无,必填项,否则启动报错 port Worker工作端口 27777 serverAddress 调度中心(oh-my-scheduler-server)地址列表 无,必填项,否则启动报错 storeStrategy 本地存储策略,枚举值磁盘/内存,大型MapReduce等会产生大量Task的任务推荐使用磁盘降低内存压力,否则建议使用内存加速计算 StoreStrategy.DISK(磁盘) maxResultLength 每个Task返回结果的默认长度,超长将被截断。过长可能导致网络拥塞 8096 enableTestMode 是否启用测试模式,启用后无需Server也能顺利启动OhMyScheduler-Worker,用于处理器本地的单元测试 false 最后,初始化客户端,完成执行器的启动,代码示例如下:\n@Configuration public class OhMySchedulerConfig { @Bean public OhMyWorker initOMS() throws Exception { // 服务器HTTP地址(端口号为 server.port,而不是 ActorSystem port) List\u0026lt;String\u0026gt; serverAddress = Lists.newArrayList(\u0026#34;127.0.0.1:7700\u0026#34;, \u0026#34;127.0.0.1:7701\u0026#34;); // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); config.setPort(27777); config.setAppName(\u0026#34;oms-test\u0026#34;); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 // 为了本地模拟多个实例,只能使用 MEMORY 启动(文件只能由一个应用占有) config.setStoreStrategy(StoreStrategy.MEMORY); // 2. 创建 Worker 对象,设置配置文件 OhMyWorker ohMyWorker = new OhMyWorker(); ohMyWorker.setConfig(config); return ohMyWorker; } } 非Spring应用程序在创建OhMyWorker对象后手动调用ohMyWorker.init()方法完成初始化即可。\n OhMyScheduler日志单独配置\n目前,OhMyScheduler-Worker并没有实现自己的LogFactory(如果有需求的话请提ISSUE,可以考虑实现),原因如下:\n OhMyScheduler-Worker的日志基于Slf4J输出,即采用了基于门面设计模式的日志框架,宿主应用无论如何都可以搭起Slf4J与实际的日志框架这座桥梁。 减轻了部分开发工作量,不再需要实现自己的LogFactory(虽然不怎么难就是了\u0026hellip;)。 为此,为了顺利且友好地输出日志,请在日志配置文件(logback.xml/log4j2.xml/\u0026hellip;)中为OhMyScheduler-Worker单独进行日志配置,比如(logback示例):\n\u0026lt;appender name=\u0026#34;OMS_WORKER_APPENDER\u0026#34; class=\u0026#34;ch.qos.logback.core.rolling.RollingFileAppender\u0026#34;\u0026gt; \u0026lt;file\u0026gt;${LOG_PATH}/oms-worker.log\u0026lt;/file\u0026gt; \u0026lt;rollingPolicy class=\u0026#34;ch.qos.logback.core.rolling.TimeBasedRollingPolicy\u0026#34;\u0026gt; \u0026lt;FileNamePattern\u0026gt;${LOG_PATH}/oms-worker.%d{yyyy-MM-dd}.log\u0026lt;/FileNamePattern\u0026gt; \u0026lt;MaxHistory\u0026gt;7\u0026lt;/MaxHistory\u0026gt; \u0026lt;/rollingPolicy\u0026gt; \u0026lt;encoder class=\u0026#34;ch.qos.logback.classic.encoder.PatternLayoutEncoder\u0026#34;\u0026gt; \u0026lt;pattern\u0026gt;%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n\u0026lt;/pattern\u0026gt; \u0026lt;charset\u0026gt;UTF-8\u0026lt;/charset\u0026gt; \u0026lt;/encoder\u0026gt; \u0026lt;append\u0026gt;true\u0026lt;/append\u0026gt; \u0026lt;/appender\u0026gt; \u0026lt;logger name=\u0026#34;com.github.kfcfans.oms.worker\u0026#34; level=\u0026#34;INFO\u0026#34; additivity=\u0026#34;false\u0026#34;\u0026gt; \u0026lt;appender-ref ref=\u0026#34;OMS_WORKER_APPENDER\u0026#34; /\u0026gt; \u0026lt;/logger\u0026gt; 无论如何,OhMyScheduler-Worker启动时都会打印Banner(如下所示),您可以通过Banner来判断日志配置是否成功(emmm\u0026hellip;Markdown显示似乎有点丑,实际上超帅的呢~):\n███████ ██ ████ ████ ████████ ██ ██ ██ ██░░░░░██ ░██ ░██░██ ██░██ ██ ██ ██░░░░░░ ░██ ░██ ░██ ██ ░░██░██ ░██░░██ ██ ░██ ░░██ ██ ░██ █████ ░██ █████ ░██ ██ ██ ░██ █████ ██████ ░██ ░██░██████ ░██ ░░███ ░██ ░░███ ░█████████ ██░░░██░██████ ██░░░██ ██████░██ ░██ ░██ ██░░░██░░██░░█ ░██ ░██░██░░░██░██ ░░█ ░██ ░██ ░░░░░░░░██░██ ░░ ░██░░░██░███████ ██░░░██░██ ░██ ░██░███████ ░██ ░ ░░██ ██ ░██ ░██░██ ░ ░██ ██ ░██░██ ██░██ ░██░██░░░░ ░██ ░██░██ ░██ ░██░██░░░░ ░██ ░░███████ ░██ ░██░██ ░██ ██ ████████ ░░█████ ░██ ░██░░██████░░██████░░██████ ███░░██████░███ ░░░░░░░ ░░ ░░ ░░ ░░ ░░ ░░░░░░░░ ░░░░░ ░░ ░░ ░░░░░░ ░░░░░░ ░░░░░░ ░░░ ░░░░░░ ░░░ 基于agent的执行器初始化 agent是一个没有任何业务逻辑的执行器(其实就是为worker加了一个main方法)。\n 代码编译方式启动示例:java -jar oh-my-scheduler-worker-agent-1.2.0.jar -a my-agent:\nUsage: OhMyAgent [-hV] -a=\u0026lt;appName\u0026gt; [-e=\u0026lt;storeStrategy\u0026gt;] [-l=\u0026lt;length\u0026gt;] [-p=\u0026lt;port\u0026gt;] [-s=\u0026lt;server\u0026gt;] OhMyScheduler-Worker代理 -a, --app=\u0026lt;appName\u0026gt; worker-agent名称,可通过调度中心控制台创建 -e, --persistence=\u0026lt;storeStrategy\u0026gt; 存储策略,枚举值,DISK 或 MEMORY -h, --help Show this help message and exit. -l, --length=\u0026lt;length\u0026gt; 返回值最大长度 -p, --port=\u0026lt;port\u0026gt; worker-agent的ActorSystem监听端口,不建议更改 -s, --server=\u0026lt;server\u0026gt; 调度中心地址,多值英文逗号分隔,格式 IP:Port OR domain -V, --version Print version information and exit. Docker镜像:Docker Hub\n"});index.add({'id':5,'href':'/ohmyscheduler/docs/super/','title':"高级特性",'content':""});index.add({'id':6,'href':'/ohmyscheduler/docs/startup/3-processor-develop/','title':"处理器开发",'content':"处理器概述 OhMyScheduler当前支持Shell、Python等脚本处理器和Java处理器。脚本处理器只需要开发者完成脚本的编写(xxx.sh / xxx.py),在控制台填入脚本内容即可,本章不再赘述。本章将重点阐述Java处理器开发方法与使用技巧。\n Java处理器可根据代码所处位置划分为内置Java处理器和容器Java处理器,前者直接集成在宿主应用(也就是接入本系统的业务应用)中,一般用来处理业务需求;后者可以在一个独立的轻量级的Java工程中开发,通过容器技术(详见容器章节)被worker集群热加载,提供Java的“脚本能力”,一般用于处理灵活多变的需求。 Java处理器可根据对象创建者划分为SpringBean处理器和普通Java对象处理器,前者由Spring IOC容器完成处理器的创建和初始化,后者则有OhMyScheduler维护其状态。如果宿主应用支持Spring,强烈建议使用SpringBean处理器,开发者仅需要将Processor注册进Spring IOC容器(一个@Component注解或一句bean配置)。 Java处理器可根据功能划分为单机处理器、广播处理器、Map处理器和MapReduce处理器。 单机处理器(BasicProcessor)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。 广播处理器(BroadcastProcessor)对应了广播任务,即某个任务的某次运行会调动集群内所有机器参与运算。 Map处理器(MapProcessor)对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。 MapReduce处理器(MapReduceProcessor)对应了MapReduce任务,在Map任务的基础上,增加了所有任务结束后的汇总统计。 核心方法:process 任意Java处理器都需要实现处理的核心方法,其接口签名如下:\nProcessResult process(TaskContext context) throws Exception; 方法入参TaskContext,包含了本次处理的上下文信息,具体属性如下:\n 属性名称 意义/用法 jobId 任务ID,开发者一般无需关心此参数 instanceId 任务实例ID,全局唯一,开发者一般无需关心此参数 subInstanceId 子任务实例ID,秒级任务使用,开发者一般无需关心此参数 taskId 采用链式命名法的ID,在某个任务实例内唯一,开发者一般无需关心此参数 taskName task名称,Map/MapReduce任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数 jobParams 任务参数,其值等同于控制台录入的任务参数,常用! instanceParams 任务实例参数,其值等同于使用OpenAPI运行任务实例时传递的参数,常用! maxRetryTimes Task的最大重试次数 currentRetryTimes Task的当前重试次数,和maxRetryTimes联合起来可以判断当前是否为该Task的最后一次运行机会 subTask 子Task,Map/MapReduce处理器专属,开发者调用map方法时传递的子任务列表中的某一个 omsLogger 在线日志,用法同Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,可能对Server造成巨大的压力 方法的返回值为ProcessResult,代表了本次Task执行的结果,包含success和msg两个属性,分别用于传递Task是否执行成功和Task需要返回的信息。\n单机处理器:BasicProcessor 单机执行的策略下,server会在所有可用worker中选取健康度最佳的机器进行执行。单机执行任务需要实现接口:BasicProcessor,代码示例如下:\n// 支持 SpringBean 的形式 @Component public class BasicProcessorDemo implements BasicProcessor { @Resource private MysteryService mysteryService; @Override public ProcessResult process(TaskContext context) throws Exception { // 在线日志功能,可以直接在控制台查看任务日志,非常便捷 OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info(\u0026#34;BasicProcessorDemo start to process, current JobParams is {}.\u0026#34;, context.getJobParams()); // TaskContext为任务的上下文信息,包含了在控制台录入的任务元数据,常用字段为 // jobParams(任务参数,在控制台录入),instanceParams(任务实例参数,通过 OpenAPI 触发的任务实例才可能存在该参数) // 进行实际处理... mysteryService.hasaki(); // 返回结果,该结果会被持久化到数据库,在前端页面直接查看,极为方便 return new ProcessResult(true, \u0026#34;result is xxx\u0026#34;); } } 广播处理器:BroadcastProcessor 广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor的基础上额外增加了preProcess和postProcess方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:\n@Component public class BroadcastProcessorDemo extends BroadcastProcessor { @Override public ProcessResult preProcess(TaskContext taskContext) throws Exception { // 预执行,会在所有 worker 执行 process 方法前调用 return new ProcessResult(true, \u0026#34;init success\u0026#34;); } @Override public ProcessResult process(TaskContext context) throws Exception { // 撰写整个worker集群都会执行的代码逻辑 return new ProcessResult(true, \u0026#34;release resource success\u0026#34;); } @Override public ProcessResult postProcess(TaskContext taskContext, List\u0026lt;TaskResult\u0026gt; taskResults) throws Exception { // taskResults 存储了所有worker执行的结果(包括preProcess) // 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果 return new ProcessResult(true, \u0026#34;process success\u0026#34;); } } 并行处理器:MapReduceProcessor MapReduce是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker执行,是执行大批量处理任务的不二之选!实现MapReduce处理器需要继承MapReduceProcessor类,具体用法如下示例代码所示:\n@Component public class MapReduceProcessorDemo extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { // 判断是否为根任务 if (isRootTask()) { // 构造子任务 List\u0026lt;SubTask\u0026gt; subTaskList = Lists.newLinkedList(); /* * 子任务的构造由开发者自己定义 * eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下: * 1. 根任务(RootTask)读取文件,流式拉取100W个ID,并按1000个一批的大小组装成子任务进行派发 * 2. 非根任务获取子任务,完成业务逻辑的处理 */ // 调用 map 方法,派发子任务 return map(subTaskList, \u0026#34;DATA_PROCESS_TASK\u0026#34;); } // 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支 if (context.getSubTask() instanceof SubTask) { // 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器 return new ProcessResult(true, \u0026#34;PROCESS_SUB_TASK_SUCCESS\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN_BUG\u0026#34;); } @Override public ProcessResult reduce(TaskContext taskContext, List\u0026lt;TaskResult\u0026gt; taskResults) { // 所有 Task 执行结束后,reduce 将会被执行 // taskResults 保存了所有子任务的执行结果 // 用法举例,统计执行结果 AtomicLong successCnt = new AtomicLong(0); taskResults.forEach(tr -\u0026gt; { if (tr.isSuccess()) { successCnt.incrementAndGet(); } }); // 该结果将作为任务最终的执行结果 return new ProcessResult(true, \u0026#34;success task num:\u0026#34; + successCnt.get()); } // 自定义的子任务 private static class SubTask { private Long siteId; private List\u0026lt;Long\u0026gt; idList; } } 注:Map处理器相当于MapReduce处理器的阉割版本(阉割了reduce方法),此处不再单独举例。\n最佳实践:MapReduce实现静态分片 虽然说这有点傻鸡焉用牛刀的感觉,不过既然目前市场上同类产品都处于静态分片的阶段,我也就在这里给大家举个例子吧~\n@Component public class StaticSliceProcessor extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); // root task 负责分发任务 if (isRootTask()) { // 从控制台传递分片参数,架设格式为KV:1=a\u0026amp;2=b\u0026amp;3=c String jobParams = context.getJobParams(); Map\u0026lt;String, String\u0026gt; paramsMap = Splitter.on(\u0026#34;\u0026amp;\u0026#34;).withKeyValueSeparator(\u0026#34;=\u0026#34;).split(jobParams); List\u0026lt;SubTask\u0026gt; subTasks = Lists.newLinkedList(); paramsMap.forEach((k, v) -\u0026gt; subTasks.add(new SubTask(Integer.parseInt(k), v))); return map(subTasks, \u0026#34;SLICE_TASK\u0026#34;); } Object subTask = context.getSubTask(); if (subTask instanceof SubTask) { // 实际处理 // 当然,如果觉得 subTask 还是很大,也可以继续分发哦 return new ProcessResult(true, \u0026#34;subTask:\u0026#34; + ((SubTask) subTask).getIndex() + \u0026#34; process successfully\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN BUG\u0026#34;); } @Override public ProcessResult reduce(TaskContext context, List\u0026lt;TaskResult\u0026gt; taskResults) { // 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可 return new ProcessResult(true, \u0026#34;xxxx\u0026#34;); } @Getter @NoArgsConstructor @AllArgsConstructor private static class SubTask { private int index; private String params; } } 最佳实践:MapReduce多级分发处理 利用MapReduce实现 Root -\u0026gt; A -\u0026gt; B/C -\u0026gt; Reduce)的DAG 工作流。\n@Component public class DAGSimulationProcessor extends MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { if (isRootTask()) { // L1. 执行根任务 // 执行完毕后产生子任务 A,需要传递的参数可以作为 TaskA 的属性进行传递 TaskA taskA = new TaskA(); return map(Lists.newArrayList(taskA), \u0026#34;LEVEL1_TASK_A\u0026#34;); } if (context.getSubTask() instanceof TaskA) { // L2. 执行A任务 // 执行完成后产生子任务 B,C(并行执行) TaskB taskB = new TaskB(); TaskC taskC = new TaskC(); return map(Lists.newArrayList(taskB, taskC), \u0026#34;LEVEL2_TASK_BC\u0026#34;); } if (context.getSubTask() instanceof TaskB) { // L3. 执行B任务 return new ProcessResult(true, \u0026#34;xxx\u0026#34;); } if (context.getSubTask() instanceof TaskC) { // L3. 执行C任务 return new ProcessResult(true, \u0026#34;xxx\u0026#34;); } return new ProcessResult(false, \u0026#34;UNKNOWN_TYPE_OF_SUB_TASK\u0026#34;); } @Override public ProcessResult reduce(TaskContext context, List\u0026lt;TaskResult\u0026gt; taskResults) { // L4. 执行最终 Reduce 任务,taskResults保存了之前所有任务的结果 taskResults.forEach(taskResult -\u0026gt; { // do something... }); return new ProcessResult(true, \u0026#34;reduce success\u0026#34;); } private static class TaskA { } private static class TaskB { } private static class TaskC { } } 更多示例 没看够?更多示例请见:oh-my-scheduler-worker-samples\n"});index.add({'id':7,'href':'/ohmyscheduler/docs/startup/4-console-guide/','title':"任务管理与在线运维",'content':"前端控制台允许开发者可视化地进行任务增、删、改、查等管理操作,同时也能直观地看到任务的运行数据,包括运行状态、详情和在线日志等。以下为对控制台的详细介绍: 主页 展示了系统整体的概览和集群Worker列表。\n任务创建 创建需要被调度执行的任务,入口为主页 -\u0026gt; 任务管理 -\u0026gt; 新建任务。\n 任务名称:名称,便于记忆与搜索,无特殊用途,请尽量简短(占用数据库字段空间)\n 任务描述:描述,无特殊作用,请尽量简短(占用数据库字段空间)\n 任务参数:任务处理时能够获取到的参数(即各个Processor的process方法入参TaskContext对象的jobParams字段)(进行一次处理器开发就能理解了)\n 定时信息:由下拉框和输入框组成\n API -\u0026gt; 不需要填写任何参数(填了也不起作用) CRON -\u0026gt; 填写 CRON 表达式(可以找个在线生成网站生成) 固定频率 -\u0026gt; 填写整数,单位毫秒 固定延迟 -\u0026gt; 填写整数,单位毫秒 执行配置:由执行类型(单机、广播和MapReduce)、处理器类型和处理器参数组成,后两项相互关联。\n 内置Java处理器 -\u0026gt; 填写该处理器的全限定类名(eg, com.github.kfcfans.oms.processors.demo.MapReduceProcessorDemo) Java容器 -\u0026gt; 填写容器ID#处理器全限定类名(eg,1#com.github.kfcfans.oms.container.DemoProcessor) SHELL -\u0026gt; 填写需要处理的脚本(直接复制文件内容)或脚本下载连接(http://xxx) PYTHON -\u0026gt; 填写完整的python脚本或下载连接(http://xxx) 运行配置\n 最大实例数:该任务同时执行的数量(任务和实例就像是类和对象的关系,任务被调度执行后被称为实例) 单机线程并发数:该实例执行过程中每个Worker使用的线程数量(MapReduce任务生效,其余无论填什么,都只会使用1个线程或3个线程\u0026hellip;) 运行时间限制:限定任务的最大运行时间,超时则视为失败,单位毫秒,0代表不限制超时时间。 重试配置:\n 任务重试次数:实例级别,失败了整个任务实例重试,会更换TaskTracker(本次任务实例的Master节点),代价较大,大型Map/MapReduce慎用。 子任务重试次数:Task级别,每个子Task失败后单独重试,会更换ProcessorTracker(本次任务实际执行的Worker节点),代价较小,推荐使用。 注:对于单机任务来说,假如任务重试次数和子任务重试次数都配置了1且都执行失败,实际执行次数会变成4次!推荐任务实例重试配置为0,子任务重试次数根据实际情况配置。 机器配置:用来标明允许执行任务的机器状态,避开那些摇摇欲坠的机器,0代表无任何限制。\n 最低CPU核心数:填写浮点数,CPU可用核心数小于该值的Worker将不会执行该任务。 最低内存(GB):填写浮点数,可用内存小于该值的Worker将不会执行该任务。 最低磁盘(GB):填写浮点数,可用磁盘空间小于该值的Worker将不会执行该任务。 集群配置\n 执行机器地址:指定集群中的某几台机器执行任务(debug的好帮手),多值英文逗号分割,如192.168.1.1:27777,192.168.1.2:27777 最大执行机器数量:限定调动执行的机器数量 报警配置:选择任务执行失败后报警通知的对象,需要事先录入。\n 任务管理 直观地展示当前系统所管理的所有任务信息,并提供相应的运维方法。\n运行状态 直观地展示当前系统中运行任务实例的状态,点击详情即可获取详细的信息,点击日志可以查看通过omsLogger上报的日志,点击停止则可以强制终止该任务。\n在线日志 在线查看Worker执行过程中上报的日志,极大降低debug成本,提升开发效率!\n"});index.add({'id':8,'href':'/ohmyscheduler/docs/version/','title':"更新日志",'content':""});index.add({'id':9,'href':'/ohmyscheduler/docs/','title':"Docs",'content':""});})(); |