Oozie所支持工作流,工作流定义通过将多个Hadoop Job的定义按照一定的顺序组织起来,然后作为一个整体按照既定的路径运行。一个工作流已经定义了,通过启动该工作流Job,就会执行该工作流中包含的多个Hadoop Job,直到完成,这就是工作流Job的生命周期。
那么,现在我们有一个工作流Job,希望每天半夜00:00启动运行,我们能够想到的就是通过写一个定时脚本来调度程序运行。如果我们有多个工作流Job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好各个工作流Job的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,他们能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运行。
运行Coordinator Job
我们先看一下官方发行包自带的一个简单的例子oozie-3.3.2\examples\src\main\apps\cron,它能够实现定时调度一个工作流Job运行,这个例子中给出的一个空的工作流Job,也是为了演示能够使用Coordinator系统给调度起来。这个例子有3个配置文件,我们不修改workflow.xml配置内容。修改后分别如下所示:
- job.properties配置
nameNode=hdfs://m1:9000 jobTracker=m1:19830 queueName=default examplesRoot=examples oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron start=2014-03-04T19:00Z end=2014-03-06T01:00Z workflowAppUri=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron
修改了Hadoop集群的配置,以及调度起止时间范围。
- workflow.xml配置
<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf"> <start to="end"/> <end name="end"/> </workflow-app>
是一个空Job,没做任何修改。
- coordinator.xml配置
<coordinator-app name="cron-coord" frequency="${coord:minutes(2)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2"> <action> <workflow> <app-path>${workflowAppUri}</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> </configuration> </workflow> </action> </coordinator-app>
修改上述coordinator.xml配置文件,将定时调度频率改为2分钟,然后需要将他们上传到HDFS上:
hadoop fs -rm /user/shirdrn/examples/apps/cron/coordinator.xml hadoop fs -put /home/shirdrn/cloud/programs/oozie-3.3.2/examples/target/oozie-examples-3.3.2-examples/examples/apps/cron/coordinator.xml /user/shirdrn/examples/apps/cron/
因为我之前已经上传过一次,所以修改了coordinator.xml文件配置内容后,一定要上传到HDFS中,而job.properties配置可以通过指定config选项来执行。启动一个Coordinator Job和启动一个Oozie工作流Job类似,执行如下命令即可:
bin/oozie job -oozie http://oozie-server:11000/oozie -config /home/shirdrn/cloud/programs/oozie-3.3.2/examples/target/oozie-examples-3.3.2-examples/examples/apps/cron/job.properties -run
运行上面命令,在控制台上会返回这个Job的ID,我们也可以通过Oozie的Web控制台来查看:
- Coordinator Job状态
- Coordinator Job详情
如果想要杀掉一个Job,需要指定Oozie的Job ID,可以执行如下命令:
bin/oozie job -oozie http://oozie-server:11000/oozie -kill 0000065-140302210847342-oozie-shir-C
Coordinator应用(Coordinator Application)
Coordinator应用是指当满足一定条件时,会触发Oozie工作流Job(在Coordinator中将工作流Job定义为一个动作(Action))。其中,触发条件可以是一个时间频率、一个dataset实例是否可用,或者可能是外部的其他事件。
Coordinator Job是一个Coordinator应用的运行实例,这个Coordinator Job是在Oozie提供的Coordinator引擎上运行的,并且这个实例从指定的时间开始,直到运行结束。一个Coordinator Job具有以上几个状态:
- PREP
- RUNNING
- RUNNINGWITHERROR
- PREPSUSPENDED
- SUSPENDED
- SUSPENDEDWITHERROR
- PREPPAUSED
- PAUSED
- PAUSEDWITHERROR
- SUCCEEDED
- DONEWITHERROR
- KILLED
- FAILED
从状态字符串的含义,我们大概就能知道它的含义,这里不做过多解释,可以查阅官方文档。现在,我们关注一下这些状态之间是怎样转移的,从一个状态变成哪些状态是合法的,如下表所示:
转移前状态 | 转以后状态集合 |
PREP | PREPSUSPENDED | PREPPAUSED | RUNNING | KILLED |
RUNNING | RUNNINGWITHERROR | SUSPENDED | PAUSED | SUCCEEDED | KILLED |
RUNNINGWITHERROR | RUNNING | SUSPENDEDWITHERROR | PAUSEDWITHERROR | DONEWITHERROR | KILLED | FAILED |
PREPSUSPENDED | PREP | KILLED |
SUSPENDED | RUNNING | KILLED |
SUSPENDEDWITHERROR | RUNNINGWITHERROR | KILLED |
PREPPAUSED | PREP | KILLED |
PAUSED | SUSPENDED | RUNNING | KILLED |
PAUSEDWITHERROR | SUSPENDEDWITHERROR | RUNNINGWITHERROR | KILLED |
我们可以看到,Coordinator Job的状态比一个基本的Oozie工作流Job的状态要复杂的多,因为Coordinator Job的基本执行单元可能是一个基本Oozie Job,而且外加了一些调度信息,必然要增加额外的状态来描述。
Coordinator动作(Coordinator Action)
一个Coordinator Job会创建并执行Coordinator 动作(Coordinator Action)。通常一个Coordinator 动作是一个工作流Job,这个工作流Job会生成一个dataset实例并处理这个数据集。当一个一个Coordinator 动作被创建以后,它会一直等待满足执行条件的所有输入事件的完成然后执行,或者发生超时。
每个Coordinator Job都有一个驱动事件,来决定它所包含的Coordinator动作的初始化(创建)。对于同步Coordinator Job(synchronous coordinator job)来说,触发执行频率(frequency)就是一个驱动事件。
同样,组成Coordinator Job的基本单元是Coordinator 动作(Coordinator Action),它不像Oozie工作流Job只有OK和Error两个执行结果,一个Coordinator 动作的状态集合,如下所示:
- WAITING
- READY
- SUBMITTED
- TIMEDOUT
- RUNNING
- KILLED
- SUCCEEDED
- FAILED
一个Coordinator 动作的状态变迁情况,如下表所示:
转移前状态 | 转以后状态集合 |
WAITING | READY | TIMEDOUT | KILLED |
READY | SUBMITTED | KILLED |
SUBMITTED | RUNNING | KILLED | FAILED |
RUNNING | SUCCEEDED | KILLED | FAILED |
Coordinator应用定义(Coordinator Application Definition)
一个同步的Coordinator应用定义的语法格式,如下所示:
<coordinator-app name="[NAME]" frequency="[FREQUENCY]" start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]" xmlns="uri:oozie:coordinator:0.1"> <controls> <timeout>[TIME_PERIOD]</timeout> <concurrency>[CONCURRENCY]</concurrency> <execution>[EXECUTION_STRATEGY]</execution> </controls> <datasets> <include>[SHARED_DATASETS]</include> ... <!-- Synchronous datasets --> <dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI_TEMPLATE]</uri-template> </dataset> ... </datasets> <input-events> <data-in name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> ... </data-in> ... <data-in name="[NAME]" dataset="[DATASET]"> <start-instance>[INSTANCE]</start-instance> <end-instance>[INSTANCE]</end-instance> </data-in> ... </input-events> <output-events> <data-out name="[NAME]" dataset="[DATASET]"> <instance>[INSTANCE]</instance> </data-out> ... </output-events> <action> <workflow> <app-path>[WF-APPLICATION-PATH]</app-path> <configuration> <property> <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> </workflow> </action> </coordinator-app>
基于上述定义语法格式,我们分别说明对应元素的含义,如下所示:
- control元素
control元素定义了一个Coordinator Job的控制信息,主要包括如下三个配置元素:
元素名称 | 含义说明 |
timeout | 超时时间,单位为分钟。当一个Coordinator Job启动的时候,会初始化多个Coordinator动作,timeout用来限制这个初始化过程。默认值为-1,表示永远不超时,如果为0 则总是超时。 |
concurrency | 并发数,指多个Coordinator Job并发执行,默认值为1。 |
execution | 配置多个Coordinator Job并发执行的策略:默认是FIFO。另外还有两种:LIFO(最新的先执行)、LAST_ONLY(只执行最新的Coordinator Job,其它的全部丢弃)。 |
throttle | 一个Coordinator Job初始化时,允许Coordinator动作处于WAITING状态的最大数量。 |
- Dataset元素
Coordinator Job中有一个Dataset的概念,它可以为实际计算提供计算的数据,主要是指HDFS上的数据目录或文件,能够配置数据集生成的频率(Frequency)、URI模板、时间等信息,下面看一下dataset的语法格式:
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI TEMPLATE]</uri-template> <done-flag>[FILE NAME]</done-flag> </dataset>
举例如下:
<dataset name="stats_hive_table" frequency="${coord:days(1)}" initial-instance="2014-03-05T00:00Z" timezone="America/Los_Angeles"> <uri-template> hdfs://m1:9000/hive/warehouse/user_events/${YEAR}${MONTH}/${DAY}/data </uri-template> <done-flag>donefile.flag</done-flag> </dataset>
上面会每天都会生成一个用户事件表,可以供Hive查询分析,这里指定了这个数据集的位置,后续计算会使用这部分数据。其中,uri-template指定了一个匹配的模板,满足这个模板的路径都会被作为计算的基础数据。
另外,还有一种定义dataset集合的方式,将多个dataset合并成一个组来定义,语法格式如下所示:
<datasets> <include>[SHARED_DATASETS]</include> ... <dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]"> <uri-template>[URI TEMPLATE]</uri-template> </dataset> ... </datasets>
- input-events和output-events元素
一个Coordinator应用的输入事件指定了要执行一个Coordinator动作必须满足的输入条件,在Oozie当前版本,只支持使用dataset实例。
一个Coordinator动作可能会生成一个或多个dataset实例,在Oozie当前版本,输出事件只支持输出dataset实例。
EL常量
常量表示形式 | 含义说明 |
${coord:minutes(int n)} | 返回日期时间:从一开始,周期执行n分钟 |
${coord:hours(int n)} | 返回日期时间:从一开始,周期执行n * 60分钟 |
${coord:days(int n)} | 返回日期时间:从一开始,周期执行n * 24 * 60分钟 |
${coord:months(int n)} | 返回日期时间:从一开始,周期执行n * M * 24 * 60分钟(M表示一个月的天数) |
${coord:endOfDays(int n)} | 返回日期时间:从当天的最晚时间(即下一天)开始,周期执行n * 24 * 60分钟 |
${coord:endOfMonths(1)} | 返回日期时间:从当月的最晚时间开始(即下个月初),周期执行n * 24 * 60分钟 |
${coord:current(int n)} | 返回日期时间:从一个Coordinator动作(Action)创建时开始计算,第n个dataset实例执行时间 |
${coord:dataIn(String name)} | 在输入事件(input-events)中,解析dataset实例包含的所有的URI |
${coord:dataOut(String name)} | 在输出事件(output-events)中,解析dataset实例包含的所有的URI |
${coord:offset(int n, String timeUnit)} | 表示时间偏移,如果一个Coordinator动作创建时间为T,n为正数表示向时刻T之后偏移,n为负数向向时刻T之前偏移,timeUnit表示时间单位(选项有MINUTE、HOUR、DAY、MONTH、YEAR) |
${coord:hoursInDay(int n)} | 指定的第n天的小时数,n>0表示向后数第n天的小时数,n=0表示当天小时数,n<0表示向前数第n天的小时数 |
${coord:daysInMonth(int n)} | 指定的第n个月的天数,n>0表示向后数第n个月的天数,n=0表示当月的天数,n<0表示向前数第n个月的天数 |
${coord:tzOffset()} | ataset对应的时区与Coordinator Job的时区所差的分钟数 |
${coord:latest(int n)} | 最近以来,当前可以用的第n个dataset实例 |
${coord:future(int n, int limit)} | 当前时间之后的dataset实例,n>=0,当n=0时表示立即可用的dataset实例,limit表示dataset实例的个数 |
${coord:nominalTime()} | nominal时间等于Coordinator Job启动时间,加上多个Coordinator Job的频率所得到的日期时间。例如:start=”2009-01-01T24:00Z”,end=”2009-12-31T24:00Z”,frequency=”${coord:days(1)}”,frequency=”${coord:days(1)},则nominal时间为:2009-01-02T00:00Z、2009-01-03T00:00Z、2009-01-04T00:00Z、…、2010-01-01T00:00Z |
${coord:actualTime()} | Coordinator动作的实际创建时间。例如:start=”2011-05-01T24:00Z”,end=”2011-12-31T24:00Z”,frequency=”${coord:days(1)}”,则实际时间为:2011-05-01,2011-05-02,2011-05-03,…,2011-12-31 |
${coord:user()} | 启动当前Coordinator Job的用户名称 |
${coord:dateOffset(String baseDate, int instance, String timeUnit)} | 计算新的日期时间的公式:newDate = baseDate + instance * timeUnit,如:baseDate=’2009-01-01T00:00Z’,instance=’2′,timeUnit=’MONTH’,则计算得到的新的日期时间为’2009-03-01T00:00Z’。 |
${coord:formatTime(String timeStamp, String format)} | 格式化时间字符串,format指定模式 |
配置举例
下面,根据官网上给出的例子,进行说明,配置例子如下所示:
<coordinator-app name="hello2-coord" frequency="${coord:days(7)}" start="2009-01-07T24:00Z" end="2009-12-12T24:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <datasets> <dataset name="logs" frequency="${coord:days(1)}" initial-instance="2009-01-01T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY} </uri-template> </dataset> <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}" initial-instance="2009-01-07T24:00Z" timezone="UTC"> <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY} </uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="logs"> <start-instance>${coord:current(-6)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="siteAccessStats"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path> <configuration> <property> <name>wfInput</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>wfOutput</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
名称为logs的dataset实例频率为1天,它配置的初始实例时间为2009-01-07T24:00Z,则在input-events输入事件中开始实例(start-instance)时间为6天前,即2009-01-01T24:00Z,结束实例(end-instance)时间为当天时间。
后半部分中定义了action,其中${coord:dataIn(‘input’)}表示解析名称为input的输入事件所关联的URI(即HDFS上的文件或目录)。
参考链接
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
写的真好,通俗易懂,又很详细。
不错,写得还行
智商不够用了吗。。。
竟然没太懂
楼主,您好,请问一下,我完全按照你上面的步骤做的,在运行时报错:Error: HTTP error code: 500 : Internal Server Error
查看下了日志如下:
八月 02, 2015 7:06:36 下午 org.apache.catalina.core.ApplicationContext log
严重: StandardWrapper.Throwable
java.lang.NullPointerException
at org.apache.oozie.servlet.JsonRestServlet.init(JsonRestServlet.java:215)
at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1213)
at org.apache.catalina.core.StandardWrapper.allocate(StandardWrapper.java:827)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:129)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:620)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489)
at java.lang.Thread.run(Thread.java:744)
八月 02, 2015 7:06:36 下午 org.apache.catalina.core.StandardWrapperValve invoke
严重: Allocate exception for servlet versions
java.lang.NullPointerException
at org.apache.oozie.servlet.JsonRestServlet.init(JsonRestServlet.java:215)
at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1213)
at org.apache.catalina.core.StandardWrapper.allocate(StandardWrapper.java:827)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:129)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861)
at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:620)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489)
at java.lang.Thread.run(Thread.java:744)
楼主能否给提点下多谢。
500错误,你这tomcat就没成功启动啊,不知道这个java.lang.NullPointerException是哪里报出来的
LZ你好,我想请教下你, 我在配置coordinator.xml文件的时候
hdfs://hostname:9000/user/xxx/xxx/wx.xx_job/2015-11-19/00
_done
${coord:current(0)}
${coord:current(0)}
${workflowAppUri}
reportDate
${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), “yyyy-MM-dd”)}
reportHour
${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0, 'DAY'), “HH”)}
jobTracker
${jobTracker}
nameNode
${nameNode}
queueName
${queueName}
wfInput
${coord:dataIn(‘input’)}
任务设定是每天跑,但是我设置datasets标签的frequency属性为20分钟执行一次里面的模板作为基础数据。但是20分钟之后数据路径存在了,但是任务不执行也就是我的workflow.xml文件。
希望LZ赐教一二,谢谢!
hdfs://hostname:9000/user/xxx/xxx/wx.xx_job/2015-11-19/00
_done
${coord:current(0)}
${coord:current(0)}
网站过滤了符号吗?
有什么办法可以获取job id吗?我说的是命令,想在自动化的时候杀死jobs.
oozie的workflow调用java工程并且传递CLI参数,可以直接用arg形式传?java那边接收时候,options可以收到吗?
这个应该可以,和你把参数通过命令行传给脚本,脚本中再传给Java main方法类是一致的。
java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/ResourceMgrDelegate
at org.apache.hadoop.mapred.YARNRunner.(YARNRunner.java:112)
at org.apache.hadoop.mapred.YarnClientProtocolProvider.create(YarnClientProtocolProvider.java:34)
at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:95)
at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:82)
at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:75)
at org.apache.hadoop.mapred.JobClient.init(JobClient.java:475)
at org.apache.hadoop.mapred.JobClient.(JobClient.java:454)
at org.apache.oozie.service.HadoopAccessorService$3.run(HadoopAccessorService.java:514)
at org.apache.oozie.service.HadoopAccessorService$3.run(HadoopAccessorService.java:512)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
就是说找不到类,jar都放进 sharelib里了
想问问 如果 想设置 每个月 的 几号 和 每周 星期几 跑 任务 可以实现吗