Oozie工作流程定义详解

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions
arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。
下面,我们详细说明工作流定义相关的内容:

工作流生命周期

在Oozie中,工作流的状态可能存在如下几种:

状态 含义说明
PREP 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。
RUNNING 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。
SUSPENDED 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。
SUCCEEDED 当一个RUNNING状态的工作流Job到达了end节点,它就变成了SUCCEEDED最终完成状态。
KILLED 当一个工作流Job处于被创建后的状态,或者处于RUNNING、SUSPENDED状态时,被杀死,则工作流Job的状态变为KILLED状态。
FAILED 当一个工作流Job不可预期的错误失败而终止,就会变成FAILED状态。

上述各种状态存在相应的转移(工作流程因为某些事件,可能从一个状态跳转到另一个状态),其中合法的状态转移有如下几种,如下表所示:

转移前状态 转移后状态集合
未启动 PREP
PREP RUNNING、KILLED
RUNNING SUSPENDED、SUCCEEDED、KILLED、FAILED
SUSPENDED RUNNING、KILLED

明确上述给出的状态转移空间以后,可以根据实际需要更加灵活地来控制工作流Job的运行。

控制流节点(Control Flow Nodes)

工作流程定义中,控制工作流的开始和结束,以及工作流Job的执行路径的节点,它定义了流程的开始(start节点)和结束(end节点或kill节点),同时提供了一种控制流程执行路径的机制(decision决策节点、fork分支节点、join会签节点)。通过上面提到的各种节点,我们大概应该能够知道它们在工作流中起着怎样的作用。下面,我们看一下不同节点的语法格式:

  • start节点
  • 	<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    		...
    		<start to="[NODE-NAME]" />
    		...
    	</workflow-app>
    	

    上面start元素的to属性,指向第一个将要执行的工作流节点。

  • end节点
  • 	<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    		...
    		<end name="[NODE-NAME]" />
    		...
    	</workflow-app>
    	

    达到该节点,工作流Job会变成SUCCEEDED状态,表示成功完成。需要注意的是,一个工作流定义必须只能有一个end节点。

  • kill节点
  • 	<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    		...
    		<kill name="[NODE-NAME]">
    			<message>[MESSAGE-TO-LOG]</message>
    		</kill>
    		...
    	</workflow-app>
    	

    kill元素的name属性,是要杀死的工作流节点的名称,message元素指定了工作流节点被杀死的备注信息。达到该节点,工作流Job会变成状态KILLED。

  • decision节点
  • 	<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    		...
    		<decision name="[NODE-NAME]">
    			<switch>
    				<case to="[NODE_NAME]">[PREDICATE]</case>
    				...
    				<case to="[NODE_NAME]">[PREDICATE]</case>
    				<default to="[NODE_NAME]" />
    			</switch>
    		</decision>
    		...
    	</workflow-app>
    	

    decision节点通过预定义一组条件,当工作流Job执行到该节点时,会根据其中的条件进行判断选择,满足条件的路径将被执行。decision节点通过switch…case语法来进行路径选择,只要有满足条件的判断,就会执行对应的路径,如果没有可以配置default元素指向的节点。

  • fork节点和join节点
  • 	<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    		...
    		<fork name="[FORK-NODE-NAME]">
    			<path start="[NODE-NAME]" />
    			...
    			<path start="[NODE-NAME]" />
    		</fork>
    		...
    		<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    		...
    	</workflow-app>
    	

    for元素下面会有多个path元素,指定了可以并发执行的多个执行路径。fork中多个并发执行路径会在join节点的位置会合,只有所有的路径都到达后,才会继续执行join节点。

动作节点(Action Nodes)

工作流程定义中,能够触发一个计算任务(Computation Task)或者处理任务(Processing Task)执行的节点。所有的动作(
Action)都有一些基本的特性,我先首先来看一下:

  • 远程执行
  • 对Oozie来说,动作节点的执行都是远程的,因为Oozie可能部署在一个单独的服务器上,而工作流Job是在Hadoop集群的节点上执行的。即使Oozie在Hadoop集群的某个节点上,它也是处于与Hadoop进行独立无关的JVM示例之中(Oozie部署在Servlet容器当中)。

  • 异步性
  • 动作节点的执行,对于Oozie来说是异步的。Oozie启动一个工作流Job,这个工作流Job便开始执行。Oozie可以通过两种方式来探测工作流Job的执行情况:一种是基于回调机制,对每个任务的执行(可以看成是动作节点的执行)都对应一个唯一的URL,如果任务执行结束或者执行失败,会通过回调这个URL通知Oozie已经完成;另一种就是轮询,Oozie不停地去查询任务执行的完成状态,如果由于网络故障回调机制失败,也会使用轮询的方式来处理。

  • 执行结果要么成功,要么失败
  • 如果动作节点执行成功,则会转向ok节点;如果失败则会转向error节点。

  • 可恢复性
  • 如果一个动作节点执行失败,Oozie提供了一些恢复执行的策略,这个要根据失败的特点来进行:如果是状态转移过程中失败,Oozie会根据指定的重试时间间隔去重新执行;如果不是转移性质的失败,则只能通过手工干预来进行恢复;如果重试恢复执行都没有解决问题,则最终会跳转到error节点。

下面详细介绍Oozie内置支持的动作节点类型,如下所示:

  • Map-Reduce动作

map-reduce动作会在工作流Job中启动一个MapReduce Job任务运行,我们可以详细配置这个MapReduce Job。另外,可以通过map-reduce元素的子元素来配置一些其他的任务,如streaming、pipes、file、archive等等。
下面给出包含这些内容的语法格式说明:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<map-reduce>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<streaming>
				<mapper>[MAPPER-PROCESS]</mapper>
				<reducer>[REDUCER-PROCESS]</reducer>
				<record-reader>[RECORD-READER-CLASS]</record-reader>
				<record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
				...
				<env>[NAME=VALUE]</env>
				...
			</streaming>
			<!-- Either streaming or pipes can be specified for an action, not both -->
			<pipes>
				<map>[MAPPER]</map>
				<reduce>
					[REDUCER]
				</reducer>
					<inputformat>[INPUTFORMAT]</inputformat>
					<partitioner>[PARTITIONER]</partitioner>
					<writer>[OUTPUTFORMAT]</writer>
					<program>[EXECUTABLE]</program>
			</pipes>
			<job-xml>[JOB-XML-FILE]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
		</map-reduce>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Hive动作

Hive主要是基于类似SQL的HQL语言的,它能够方便地操作HDFS中数据,实现对海量数据的分析工作。HIve动作的语法格式如下所示:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<hive xmlns="uri:oozie:hive-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<script>[HIVE-SCRIPT]</script>
			<param>[PARAM-VALUE]</param>
			...
		</hive>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Sqoop动作

Sqoop是一个能够在Hadoop和结构化存储系统之间进行数据的导入导出的工具,Sqoop动作的语法格式如下:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<sqoop xmlns="uri:oozie:sqoop-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<command>[SQOOP-COMMAND]</command>
			<file>[FILE-PATH]</file>
			...
		</sqoop>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Pig动作

pig动作可以启动运行pig脚本实现的Job,在工作流定义中配置的语法格式说明如下:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<pig>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<job-xml>[JOB-XML-FILE]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<script>[PIG-SCRIPT]</script>
			<param>[PARAM-VALUE]</param>
			...
			<param>[PARAM-VALUE]</param>
			<argument>[ARGUMENT-VALUE]</argument>
			...
			<argument>[ARGUMENT-VALUE]</argument>
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
		</pig>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Fs动作

Fs动作主要是基于HDFS的一些基本操作,如删除路径、创建路径、移动文件、设置文件全乡等等。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<fs>
			<delete path='[PATH]' />
			...
			<mkdir path='[PATH]' />
			...
			<move source='[SOURCE-PATH]' target='[TARGET-PATH]' />
			...
			<chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
			...
			<touchz path='[PATH]' />
		</fs>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • SSH动作

该动作主要是通过ssh登录到一台主机,能够执行一组shell命令,它在Oozie schema 0.2中已经被删除。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<ssh>
			<host>[USER]@[HOST]</host>
			<command>[SHELL]</command>
			<args>[ARGUMENTS]</args>
			...
			<capture-output />
		</ssh>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Java动作

Java动作,是执行一个具有main入口方法的应用程序,在Oozie工作流定义中,会作为一个MapReduce Job执行,这个Job只有一个Map任务。我们需要指定NameNode、JobTracker的信息,还有配置一个Java应用程序的JVM选项参数(java-opts),以及传给主函数(arg)。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<java>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<job-xml>[JOB-XML]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<main-class>[MAIN-CLASS]</main-class>
			<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
			<arg>ARGUMENT</arg>
			...
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
			<capture-output />
		</java>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Sub-workflow动作

Sub-workflow动作是一个子流程的动作,主流程执行过程中,遇到子流程节点执行时,会一直等待子流程节点执行完成后,才能继续跳转到下一个要执行的节点。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<sub-workflow>
			<app-path>[WF-APPLICATION-PATH]</app-path>
			<propagate-configuration />
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
		</sub-workflow>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>
  • Shell动作

Shell动作可以执行Shell命令,并通过配置命令所需要的参数。它的语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.4">
	...
	<action name="[NODE-NAME]">
		<shell xmlns="uri:oozie:shell-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<exec>[SHELL-COMMAND]</exec>
			<argument>[ARGUMENT-VALUE]</argument>
			<capture-output />
		</shell>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

表达式语言函数(Expression Language Functions)

Oozie除了可以使用Properties文件定义一些属性之外,还提供了一些内置的EL函数,能够方便地实现流程的定义和控制,下面我们分组列表说明:

  • 基本EL常量
常量名称 含义说明
KB 1KB,类型为long。
MB 1MB,类型为long。
GB 1GB,类型为long。
TB 1TB,类型为long。
PB 1PB,类型为long。
  • 基本EL函数
函数声明 含义说明
String firstNotNull(String value1, String value2) 返回value1和value2中不为null的值,若都为null则返回null
String concat(String s1, String s2) 连接字符串s1和s2,如果s1或s2为null值,则使用空字符串替换null值
String replaceAll(String src, String regex, String replacement) 满足正则表达式regex,则使用replace替换src字符串中匹配上的部分
String appendAll(String src, String append, String delimeter) 将src中的分隔符delimeter替换为append
String trim(String s) 去掉字符串两边的空格,如果s为null则返回空字符串
String urlEncode(String s) 对字符串s使用URL UTF-8进行编码
String timestamp() 返回UTC当前时间字符串,格式为YYYY-MM-DDThh:mm:ss.sZ
String toJsonStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的JSON表示形式
String toPropertiesStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的Properties表示形式
String toConfigurationStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的Configuration表示形式
  • 工作流EL函数
函数声明 含义说明
String wf:id() 返回当前的工作流Job的ID
String wf:name() 返回当前的工作流Job的名称
String wf:appPath() 返回当前的工作流Job的应用路径
String wf:conf(String name) 返回当前的工作流Job的配置属性
String wf:user() 返回启动当前的工作流Job的用户名称
String wf:group() 返回当前的工作流Job的的用户组名称
String wf:callback(String stateVar) 返回当前的工作流Job的当前动作节点的回调URL
String wf:transition(String node) 返回转移节点,该节点是一个工作流动作节点触发的
String wf:lastErrorNode() 返回最后一个以ERROR状态退出的节点名称
String wf:errorCode(String node) 返回指定动作节点执行的错误码,如果没有则返回空
String wf:errorMessage(String message) 返回指定动作节点执行的错误信息,如果没有则返回空
int wf:run() 返回当前工作流Job的运行编号,正常的话返回0,如果执行过re-run则返回非0
Map wf:actionData(String node) 返回当前动作节点完成时输出的信息
int wf:actionExternalId(String node) 返回动作节点的外部ID
int wf:actionTrackerUri(String node) 返回跟踪一个动作节点的URI
int wf:actionExternalStatus(String node) 返回一个动作节点的状态
  • Hadoop EL常量
常量名称 含义说明
RECORDS Hadoop Record计数器组名称
MAP_IN Hadoop Mapper输入Record计数器名称
MAP_OUT Hadoop Mapper输出Record计数器名称
REDUCE_IN Hadoop Reducer输入Record计数器名称
REDUCE_OUT HadoopReducer输出Record计数器名称
GROUPS 1024 * Hadoop Mapper/Reducer输入Record组计数器名称
  • Hadoop EL函数
函数声明 含义说明
Map < String, Map > hadoop:counters(String node) 返回工作流Job某个动作节点的统计计数器信息,例如,MR的动作统计集合内容:
{
“ACTION_TYPE”: “MAP_REDUCE”,
“org.apache.hadoop.mapred.JobInProgress$Counter”: {
“TOTAL_LAUNCHED_REDUCES”: 1,
“TOTAL_LAUNCHED_MAPS”: 1,
“DATA_LOCAL_MAPS”: 1
},
“FileSystemCounters”: {
“FILE_BYTES_READ”: 1746,
“HDFS_BYTES_READ”: 1409,
“FILE_BYTES_WRITTEN”: 3524,
“HDFS_BYTES_WRITTEN”: 1547
},
“org.apache.hadoop.mapred.Task$Counter”: {
“REDUCE_INPUT_GROUPS”: 33,
“COMBINE_OUTPUT_RECORDS”: 0,
“MAP_INPUT_RECORDS”: 33,
“REDUCE_SHUFFLE_BYTES”: 0,
“REDUCE_OUTPUT_RECORDS”: 33,
“SPILLED_RECORDS”: 66,
“MAP_OUTPUT_BYTES”: 1674,
“MAP_INPUT_BYTES”: 1409,
“MAP_OUTPUT_RECORDS”: 33,
“COMBINE_INPUT_RECORDS”: 0,
“REDUCE_INPUT_RECORDS”: 33
}
}
则${hadoop:counters(“mr-node”)["FileSystemCounters"]["FILE_BYTES_READ"]},得到名称为mr-node的动作节点组的FILE_BYTES_READ计数器的值
  • HDFS EL函数
选项 含义说明
boolean fs:exists(String path) path是否存在
boolean fs:isDir(String path) path是否是目录
long fs:dirSize(String path) 如果path不是目录或者path是一个文件,则返回-1,否则返回该path下所有文件的字节数
long fs:fileSize(String path) 如果path是目录,则返回-1,否则返回该path下所有文件的字节数
long fs:blockSize(String path) 如果path不是文件或者不存在则返回-1,否则返回文件的块大小字节数

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>