Apache Pig简介与实践

Apache Pig是一个用来分析大数据集的平台,它由两部分组成:一部分是用于表达数据分析程序的高级脚本语言,另一部分是用于评估分析程序的基本工具。目前来看,Pig主要用于离线数据的批量处理应用场景,但是随着Pig的发展处理数据的速度会不断地提升,这可能依赖于Pig底层的执行引擎。比如,Pig通过指定执行模式,可以使用Hadoop的MapReduce计算引擎来实现数据处理,也可以使用基于Tez的计算引擎来实现(Tez是为了绕开MapReduce多阶段Job写磁盘而设计的DAG计算引擎,性能应该比MapReduce要快),看到Pig未来的发展路线图,以后可能会基于Storm或Spark计算平台实现底层计算引擎,那样速度会有极大地提升。
我们基于最新的0.15.0版本的Pig(Hadoop使用的是2.2.0版本),通过编写一些例子脚本来实践Pig的语言特性。

Pig安装与执行

Pig安装非常简单,只需要下载Pig包,然后解压缩即可:

wget http://mirror.bit.edu.cn/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz
tar xvzf pig-0.15.0.tar.gz
sudo ln -s /usr/local/pig-0.15.0 /usr/local/pig
cd /usr/local/pig
bin/pig -x mapreduce

如果希望直接使用pig命令,可以修改环境变量文件~/.bashrc,增加如下配置:

export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin

使变量配置生效:

. ~/.bashrc

Pig支持如下4种执行模式:

  • 本地模式

本地模式主要是基于本地文件系统,比较适合调试脚本使用。进入本地模式执行如下命令:

pig -x local
  • Tez本地模式

Tez本地模式类似于前面的本地模式,它使用Tez运行时引擎,进入Tez本地模式执行如下命令:

pig -x tez_local

不过该模式还处于试验阶段,不过多累述。

  • MapReduce模式

MapReduce模式基于Hadoop,数据存储在HDFS上,它基于运行于YARN之上的MapReduce进行处理。进入MapReduce运行模式执行如下命令:

pig -x mapreduce

一般,我们的数据都是存储在HDFS上的,使用该模式能够充分利用Hadoop集群的计算能力。

  • Tez模式

基于Tez模式执行,需要在安装Hadoop集群的时候,修改Hadoop配置文件mapred-site.xml,将属性mapreduce.framework.name的值设置为yarn-tez。进入Tez模式执行如下命令:

pig -x tez

有关Tez相关内容,可以查看Apache Tez官网介绍。

数据类型

Pig的数据类型可以分为2类,分别为简单类型和复杂类型。简单类型包括:
int、long、float、double、chararray、bytearray、boolean、datetime、biginteger、bigdecimal。复杂类型包括:tuple、bag、map。
这里对特别的数据类型,解释说明一下:
chararray相当于字符串String;bytearray相当于字节数组;tuple是一个有序的字段的集合,可以理解为元组,例如(3090018, ‘Android’, 76);bag是tuple的集合,例如{(3090018, ‘Android’, 76), (3090019, ‘iOS’, 172)};map是键值对的集合,例如[name#Jeff Stone, age#28, healthy index#195.58]。

基本操作符

  • 算数操作符(Arithmetic Operators)包括:+、-、*、/、%、?:、CASE WHEN THEN ELSE END。
  • 布尔操作符(Boolean Operators)包括:AND、OR、IN、NOT。
  • 类型转换操作符(Cast Operators):使用圆括号包含类型名,作用于一个字段,例如(int)age、(map[])、(chararray)COUNT($2)、(tuple(chararray,int,map[]))name_age_scores等等。
  • 比较操作符(Comparison Operators)包括:==、!=、<、>、<=、>=、matches。其中,matches比较操作符使用Java的Pattern进行匹配来比较,例如user_name matches ‘[a-n]{3,12}’。
  • 类型构造操作符(Type Construction Operators):可以创建复杂类型的数据,tuple使用(),map使用[],bag使用{},例如FOREACH users GENERATE (name, age, address)。
  • 解引用操作符(Dereference Operators):解引用主要是针对集合类型tuple、bag、map,从集合中拿到对应字段的值。比如对于tuple,定义类型t=tuple(t1:int,t2:int,t3:int),则我要获取字段t1和t3的值,一种方式可以通过t.t1和t.t3得到,也可以通过t.$0和t.$2获取到。

关系操作符

操作符 语法 说明
ASSERT ASSERT alias BY expression [, message]; 断言:判定某个字段的值的条件为true
COGROUP alias = COGROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; 数据分组,与GROUP相同,但是至多支持127个关系
CROSS alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n]; 笛卡尔积
CUBE alias = CUBE alias BY { CUBE expression | ROLLUP expression }, [ CUBE expression | ROLLUP expression ] [PARALLEL n]; 计算CUBE,支持ROLLUP操作
DEFINE DEFINE macro_name (param [, param ...]) RETURNS {void | alias [, alias ...]} { pig_latin_fragment };
DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] };
定义宏(类似函数),能够重用脚本代码
为UDF或streaming设置别名
DISTINCT alias = DISTINCT alias [PARTITION BY partitioner] [PARALLEL n]; 去重操作,可以指定并行度(即Reducer个数)
FILTER alias = FILTER alias BY expression; 条件过滤
FOREACH alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];

alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};

基于列对数据进行转换
GROUP alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; 数据分组操作:
只支持一个关系
PARALLEL子句可以指定并行度(Reducer个数)
IMPORT IMPORT ‘file-with-macro’; 导入外部Pig脚本
JOIN alias = JOIN alias BY {expression|’(‘expression [, expression …]‘)’} (, alias BY {expression|’(‘expression [, expression …]‘)’} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];

alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

内连接
外连接
LIMIT alias = LIMIT alias n; 输出结果集的n个记录
LOAD LOAD ‘data’ [USING function] [AS schema]; 从数据源加载数据
MAPREDUCE alias1 = MAPREDUCE ‘mr.jar’ STORE alias2 INTO ‘inputLocation’ USING storeFunc LOAD ‘outputLocation’ USING loadFunc AS schema [`params, ... `]; 在Pig中执行MapReduce程序,需要指定使用的MapReduce程序JAR文件
ORDER BY alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n]; 排序
RANK alias = RANK alias [ BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [DENSE] ]; 排名操作:可能有排名相同的,即排名序号相同
SAMPLE SAMPLE alias size; 用于采样,size范围[0, 1]
SPLIT SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE]; 将一个大表,拆分成多个小表
STORE STORE alias INTO ‘directory’ [USING function]; 存储结果到文件系统:如果为指定USING子句,则使用默认的PigStorage(),更多可以查看“Load/Store函数”。
STREAM alias = STREAM alias [, alias …] THROUGH {`command` | cmd_alias } [AS schema] ; 将数据发送到外部程序或者脚本
UNION alias = UNION [ONSCHEMA] alias, alias [, alias …]; 计算并集

关系操作符示例

  • ASSERT
live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray);
ASSERT live_user_ids BY udid != null, 'udid MUST NOT be NULL!';

上面断言表live_user_ids中的udid字段一定存在值。

  • GROUP

根据某个或某些字段进行分组,只根据一个字段进行分组,比较简单。如果想要根据两个字段分组,则可以将两个字段构造成一个tuple,然后进行分组。Pig脚本如下所示:

events = LOAD '/data/etl/hive_input/20150613/basis_event_2015061306-r-00002'  USING PigStorage('\t');
projected_events = FOREACH events GENERATE $1 AS (event_code: long), $2 AS (udid: chararray), $10 AS (network: chararray), $34 AS (area_code: int); -- $1是表events的第2个字段
uniq_events = DISTINCT projected_events;
uniq_events = FILTER uniq_events BY (event_code IS NOT NULL) AND (udid IS NOT NULL) AND (network IS NOT NULL) AND (area_code IS NOT NULL);
groupped = GROUP uniq_events BY (udid, event_code) PARALLEL 2; -- 指定使用2个Reducer
selected10 = LIMIT groupped 10;
DUMP selected10;

从HDFS加载的文件中执行投影操作,生成包含event_code、udid、network、area_code这4个字段的一个表projected_events,接着执行去重、条件过滤操作,计算分组的时候基于event_code、udid两个字段进行分组。

  • FOREACH

FOREACH操作可以针对一个数据集进行迭代处理操作,生成一个新的数据集。它有2种使用方法,一种是执行投影操作,选择部分字段的数据,例如脚本:

provinces = LOAD '/test/provinces' USING PigStorage(',') AS (country_id: int, province_id: int, name: chararray);
compositekeyed_provinces = FOREACH provinces GENERATE (CONCAT(CONCAT((chararray)country_id, '_'), (chararray)province_id) AS pid, name);

这里,将原数据集的两个主键字段的值进行拼接合并,作为新表的一个字段。
另一种是,支持在FOREACH操作中使用代码段,可以增加更复杂的处理逻辑,摘自官网的例子,例如脚本:

a = LOAD '/test/data' AS (url:chararray, outlink:chararray);
DUMP a;
(www.ccc.com,www.hjk.com)
(www.ddd.com,www.xyz.org)
(www.aaa.com,www.cvn.org)
(www.www.com,www.kpt.net)
(www.www.com,www.xyz.org)
(www.ddd.com,www.xyz.org)
b = GROUP a BY url;
DUMP b;
(www.aaa.com,{(www.aaa.com,www.cvn.org)})
(www.ccc.com,{(www.ccc.com,www.hjk.com)})
(www.ddd.com,{(www.ddd.com,www.xyz.org),(www.ddd.com,www.xyz.org)})
(www.www.com,{(www.www.com,www.kpt.net),(www.www.com,www.xyz.org)})

result = FOREACH b {
        filterda = FILTER a BY outlink == 'www.xyz.org'; -- 过滤掉outlink字段值为'www.xyz.org'的记录
        filtered_outlinks = filterda.outlink;
        filtered_outlinks = DISTINCT filtered_outlinks;  -- 对outlink集合进行去重
        GENERATE group, COUNT(filtered_outlinks); -- 根据对表a进行分组得到group,计算每个分组中outlink的数量
};
DUMP result;
(www.aaa.com,0)
(www.ccc.com,0)
(www.ddd.com,1)
(www.www.com,1)

上面,表b的第一个字段为chararray类型的字段(存放域名字符串),第二个字段是一个bag类型的字段(存在当前域名的出链接,即<url, outlink>的集合),最后统计的结果是给定的url的出链接的个数。

  • FILTER

根据条件进行过滤,相当于SQL中WHERE子句。示例脚本如下所示:

live_info = LOAD '/test/live_info' USING PigStorage() AS (id: long, name: chararray); 
newly_added_lives = FILTER live_info BY (id % 140000 >= 0) AND (name matches '[a-zA-Z0-9]{8, 32}' OR name == 'test');

上面内容很好理解,不再累述。

  • JOIN

JOIN操作支持支持配置并行度,指定Reducer的数量。
表连接操作,支持内连接和外连接,内连接脚本示例如下:

live = LOAD '/test/shiyj/pig/pig_live' USING PigStorage('\t') AS (id: long, name: chararray);
program = LOAD '/test/shiyj/pig/pig_live_program' USING PigStorage('\t') AS (id: long,name: chararray,live_id: long,live_start: chararray,live_end: chararray);
program_info = JOIN live BY id, program BY live_id;
DUMP program_info;

根据表live的id字段,表program的live_id字段进行内连接。
外连接的操作,官网给出了4个例子,可以分别看一下。左外连接例子如下:

A = LOAD 'a.txt' AS (n:chararray, a:int);
B = LOAD 'b.txt' AS (n:chararray, m:chararray);
C = JOIN A by $0 LEFT OUTER, B BY $0; -- 表A和B的字段n进行左外连接

全外连接的例子如下所示:

A = LOAD 'a.txt' AS (n:chararray, a:int);
B = LOAD 'b.txt' AS (n:chararray, m:chararray);
C = JOIN A BY $0 FULL, B BY $0; -- 使用FULL关键字

支持复制的左外连接(Replicated Join),示例如下:

A = LOAD 'large';
B = LOAD 'tiny';
C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';

只有左外连接支持这种方式,实际上Replicated Join会在MapReduce的Map阶段把做左表进行复制,也就是说做表应该是小表,能够在内存中放得下,然后与右表进行连接操作。
还有一种使用Skewed Join,示例如下所示:

A = LOAD 'studenttab' as (name, age, gpa);
B = LOAD 'votertab' as (name, age, registration, contribution);
C = JOIN A BY name FULL, B BY name USING 'skewed';

只有在进行外连接的两表的数据,明显不对称,称为数据倾斜,一个表很大,另一个表相对小,但是内存中放不下,这种情况可以使用Skewed Join操作。目前,Pig支持基于两表的Skewed Join操作。

  • DISTINCT

去重操作使用DISTINCT,比较简单,示例如下所示:

live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray);
uniq_user_ids = DISTINCT live_user_ids PARALLEL 8;

DISTINCT操作支持配置并行度,指定Reducer的数量。

  • UNION

计算并集操作,使用UNION操作符,示例如下所示:

a = LOAD 'data' AS (a1:int,a2:int,a3:int);
b = LOAD 'data' AS (b1:int,b2:int);
u = UNION a, b;

计算并集,不要求两表的字段数一定相同。

  • LIMIT

LIMIT选择计算结果的一部分,示例如下所示:

top100_user_ids = LIMIT live_user_ids 100;
  • STORE

STORE操作用来保存计算结果,示例如下所示:

STORE play_users INTO '/test/shiyj/tmp.play_users' USING PigStorage ('\t');

如果没有指定USING子句,则默认使用PigStorage()函数,另外Pig还支持如下的Store/Load函数:

BinStorage()
JsonLoader(['schema'])
JsonStorage()
PigDump()
PigStorage([field_delimiter] , ['options'])
TextLoader()
HBaseStorage('columns', ['options'])
AvroStorage(['schema|record name'], ['options'])
TrevniStorage(['schema|record name'], ['options'])
AccumuloStorage(['columns'[, 'options']])
OrcStorage(['options'])

具体使用方法,可以参考文档介绍。

  • CROSS

比较容易理解,摘自官网上的例子,如下所示:

-- 加载表a数据
a = LOAD 'data1' AS (a1:int,a2:int,a3:int);
DUMP a;
(1,2,3)
(4,2,1)

-- 加载表b数据
b = LOAD 'data2' AS (b1:int,b2:int);
DUMP b;
(2,4)
(8,9)
(1,3)

-- 计算笛卡尔积,并输出结果
result = CROSS a, b;
DUMP result;
(1,2,3,2,4)
(1,2,3,8,9)
(1,2,3,1,3)
(4,2,1,2,4)
(4,2,1,8,9)
(4,2,1,1,3)
  • CUBE

这个操作符功能比较强大,如下所示:

users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid);
groupped = COGROUP users BY (create_date,room_id,audio_id);
groupped_count = FOREACH groupped {
     uniq = DISTINCT users.udid;
     GENERATE group, COUNT(uniq);
};
STORE groupped_count INTO '/test/shiyj/pig/groupped_count' USING PigStorage('\t'); -- 将分组统计的结果保存到HDFS

groupped_count = LOAD '/test/shiyj/pig/groupped_count/part-r-*' AS (k: tuple(chararray, long, long), cnt: int); -- 加载前面保存的结果,进行CUBE计算
groupped_count = FOREACH groupped_count GENERATE k.$0, k.$1, k.$2, cnt;
cubed_users = CUBE groupped_count BY CUBE($0, $1, $2);
DUMP cubed_users;

我们可以看下官网文档的例子,简单比较容易理解:
(1)CUBE操作
Pig脚本内容,如下所示:

salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long);
cubedinp = CUBE salesinp BY CUBE(product,year);
result = FOREACH cubedinp GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
DUMP result;

如果输入数据为(car, 2012, midwest, ohio, columbus, 4000),则上面脚本执行CUBE操作,结果输出内容如下所示:

(car,2012,4000)
(car,,4000)
(,2012,4000)
(,,4000)

上面针对产品(product)和年度(year)两个维度进行查询。
(2)ROLLUP操作
CUBE操作支持ROLLUP(上卷操作),例如脚本内容:

salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long);
rolledup = CUBE salesinp BY ROLLUP(region,state,city);
result = FOREACH rolledup GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
DUMP result;

同样如果输入tuple值为(car, 2012, midwest, ohio, columbus, 4000),则ROLLUP操作结果如下所示:

(midwest,ohio,columbus,4000)
(midwest,ohio,,4000)
(midwest,,,4000)
(,,,4000)

上面只是根据ROLLUP表达式指定的维度执行CUBE操作。
(3)合并CUBE和ROLLUP操作
还可以将CUBE操作和ROLLUP操作合并起来,例如执行脚本内容:

salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray, year:int, region:chararray, state:chararray, city:chararray, sales:long);
cubed_and_rolled = CUBE salesinp BY CUBE(product,year), ROLLUP(region, state, city); 
result = FOREACH cubed_and_rolled GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;

上面的CUBE操作等价于下面两中操作:

cubed_and_rolled = CUBE salesinp BY CUBE(product,year,region, state, city); 
-- 或
cubed_and_rolled = CUBE salesinp BY ROLLUP(product,year,region, state, city); 

执行结果,如下所示:

(car,2012,midwest,ohio,columbus,4000)
(car,2012,midwest,ohio,,4000)
(car,2012,midwest,,,4000)
(car,2012,,,,4000)
(car,,midwest,ohio,columbus,4000)
(car,,midwest,ohio,,4000)
(car,,midwest,,,4000)
(car,,,,,4000)
(,2012,midwest,ohio,columbus,4000)
(,2012,midwest,ohio,,4000)
(,2012,midwest,,,4000)
(,2012,,,,4000)
(,,midwest,ohio,columbus,4000)
(,,midwest,ohio,,4000)
(,,midwest,,,4000)
(,,,,,4000)
  • SAMPLE

对数据进行取样操作,脚本如下所示:

-- 加载原始数据集,并计算记录数
users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid);
g_users = GROUP users ALL;
total_user_cnt = FOREACH g_users GENERATE COUNT(users);
DUMP total_user_cnt;

-- 15%取样,计算取样记录数
sampled_users = SAMPLE users 0.15;
g_sampled_users = GROUP sampled_users ALL;
sampled_user_cnt = FOREACH g_sampled_users GENERATE COUNT(sampled_users);
DUMP sampled_user_cnt;
  • MAPREDUCE

MAPREDUCE操作允许在Pig脚本内部执行MapReduce程序,示例脚本来自官网,如下所示:

A = LOAD 'WordcountInput.txt';
B = MAPREDUCE 'wordcount.jar' STORE A INTO 'inputDir' LOAD 'outputDir'
    AS (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;

如果直接写原生的MapReduce程序各个能解决实际问题,可以将写好的程序打包,在Pig脚本中指定相关参数即可运行。

  • SPLIT

将一个表拆分成多个表,可以按照“水平拆分”的思想进行操作,根据某些条件来生成新表。示例如下所示:

A = LOAD 'data' AS (f1:int,f2:int,f3:int); -- 加载数据到表A
DUMP A;               
(1,2,3)
(4,5,6)
(7,8,9)       

SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6); -- A表中满足条件f1<7的记录插入到表X中,满足条件f2==5的记录插入到T表中,满足条件(f3<6 OR f3>6)的记录插入到Z表中

DUMP X;
(1,2,3)
(4,5,6)

DUMP Y;
(4,5,6)

DUMP Z;
(1,2,3)
(7,8,9)

求值函数(Eval Functions)

函数 语法 说明
AVG AVG(expression) 计算某一个数字类型的列的均值,数字类型支持:int,long,float,double,bigdecimal,biginteger,bytearray
BagToString BagToString(vals:bag [, delimiter:chararray]) 将bag转换成字符串,可以指定分隔符,适合拼接bag中字符串
CONCAT CONCAT(expression, expression, [...expression]) 字符串拼接
COUNT COUNT(expression) 计算一个bag中元素的总数,不含NULL值
COUNT_STAR COUNT_STAR(expression) 计算一个bag中元素的总数,包含NULL值
DIFF DIFF (expression, expression) 比较一个tuple中的两个字段,这两个字段都是bag类型,结果返回在两个bag中不同的元素,结果仍然是一个bag
IsEmpty IsEmpty(expression) 检查一个map或bag是否为空
MAX MAX(expression) 计算最大值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray
MIN MIN(expression) 计算最小值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray
PluckTuple DEFINE pluck PluckTuple(expression1)
DEFINE pluck PluckTuple(expression1,expression3)
pluck(expression2)
允许定义一个字符串前缀,然后过滤指定的列,满足:一概字符串前缀开始,或者匹配该正则表达式,下面是官网的例子:

a = LOAD 'a' as (x, y); 
b = LOAD 'b' as (x, y);
c = JOIN a by x, b by x; -- 表a和b连接,因为表a和b有相同的列名,所以连接后添加前缀“表名::”来区分

DEFINE pluck PluckTuple('a::'); <code>-- 定义前缀"a::",等价于DEFINE pluck PluckTuple('a::', true);
d = FOREACH c GENERATE FLATTEN(pluck(*)); -- 包含前缀"a::"的都保留
DESCRIBE c;
c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
DESCRIBE d;
d: {plucked::a::x: bytearray,plucked::a::y: bytearray}

DEFINE pluckNegative PluckTuple('a::','false'); -- 定义前缀"a::",包含该前缀的过滤掉
d = FOREACH c GENERATE FLATTEN(pluckNegative(*));
DESCRIBE d; -- 结果中包含前缀"a::"的被排除掉
d: {plucked::b::x: bytearray,plucked::b::y: bytearray}
SIZE SIZE(expression) 计算Pig指定数据类型的元素的数量,支持类型:int,long,float,double,,chararray,bytearray、tuple、bag、map
SUBTRACT SUBTRACT(expression, expression) bag操作符,用来对两个bag做差集操作,结果为:包含在第一个bag中但不包含在第二个bag中的元素
SUM SUM(expression) 求和操作,支持类型:int,long,float,double,bigdecimal,biginteger,将bytearray转换为double类型
TOKENIZE TOKENIZE(expression [, 'field_delimiter']) 拆分一个字符串,得到一个bag结果集

数学函数

数学函数比较简单,不再详细描述,主要包括如下20个:

ABS
ACOS
ASIN
ATAN
CBRT
CEIL
COS
COSH
EXP
FLOOR
LOG
LOG10
RANDOM
ROUND
ROUND_TO
SIN
SINH
SQRT
TAN
TANH

具体使用可以查看官方文档。

字符串函数

字符串函数非常常用,主要包括如下20个:

ENDSWITH
EqualsIgnoreCase
INDEXOF
LAST_INDEX_OF
LCFIRST
LOWER
LTRIM
REGEX_EXTRACT
REGEX_EXTRACT_ALL
REPLACE
RTRIM
SPRINTF
STARTSWITH
STRSPLIT
STRSPLITTOBAG
SUBSTRING
TRIM
UCFIRST
UPPER
UniqueID

使用方法可以查看文档。

日期时间函数

日期函数有下面24个,如下所示:

AddDuration
CurrentTime
DaysBetween
GetDay
GetHour
GetMilliSecond
GetMinute
GetMonth
GetSecond
GetWeek
GetWeekYear
GetYear
HoursBetween
MilliSecondsBetween
MinutesBetween
MonthsBetween
SecondsBetween
SubtractDuration
ToDate
ToMilliSeconds
ToString
ToUnixTime
WeeksBetween
YearsBetween

集合函数

集合函数主要是,将其他类型的数据转换为集合类型tuple、bag、map,如下所示:
TOTUPLE
TOBAG
TOMAP
TOP
前面3个都是生成集合的函数,最后一个用来计算一个集合中的topN个元素,可以指定是按照升序/降序得到的结果,语法为TOP(topN,column,relation)。

Hive UDF函数

在Pig中可以直接调用Hive的UDF,HiveUDAF和HiveUDTF,语法如下表所示:

函数 语法 说明
HiveUDF HiveUDF(name[, constant parameters])
DEFINE sin HiveUDF('sin'); -- 定义HiveUDF,后面可以直接使用函数sin
a = LOAD 'student' AS (name:chararray, age:int, gpa:double);
b = FOREACH a GENERATE sin(gpa); -- 使用函数sin
HiveUDAF HiveUDAF(name[, constant parameters])
DEFINE explode HiveUDTF('explode');
a = LOAD 'mydata' AS (a0:{(b0:chararray)});
b = FOREACH a GENERATE FLATTEN(explode(a0));
HiveUDTF HiveUDTF(name[, constant parameters])
DEFINE avg HiveUDAF('avg');
a = LOAD 'student' AS (name:chararray, age:int, gpa:double);
b = GROUP a BY name;
c = FOREACH b GENERATE group, AVG(a.age);

Pig UDF

Pig也支持用户自定义函数UDF,而且支持使用多种编程语言来实现UDF,目前支持的变成语言包括:Java、JavaScript、Jython、Ruby、Groovy、Python。
以Java为例,可以通过继承自类org.apache.pig.EvalFunc来实现一个UDF,将实现的UDF泪打包后,Pig安装目录下的CLASSPATH下面,然后可以在Pig脚本中使用。例如,我们实现的UDF类为org.shirdrn.pig.udf.IPAddressConverterUDF,用来根据ip代码(long类型),转换为对应的点分十进制的IP地址字符串,打包后JAR文件名称为iptool.jar,则可以在Pig脚本中这样使用:

REGISTER 'iptool.jar';
a = LOAD '/data/etl/$date_string/$event_file' AS (event_code: long, udid: chararray, ip_code: long, network: chararray);
b = FOREACH a GENERATE event_code, udid, org.shirdrn.etl.pig.udf.IPAddressConverterUDF(ip_code);
DUMP b;

也可以实现一个自定义的累加器(Accumulator)或者过滤器,或者其他一些功能,可以实现相关的接口:Algebraic,Accumulator,FilterFunc,LoadFunc,StoreFunc,具体可以参考相关文档或资料。
另外,也可以通过在PiggyBank在来查找其它用户分享的UDF,可以在 http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank中找到。

相关问题总结

  • 运行Pig脚本,从外部向脚本传递参数

在实际使用中,我们经常需要从外部传递参数到Pig脚本中,例如,文件路径,或者日期时间,等等,可以直接使用pig命令的-p参数从外部传参,例如,有下面的Pig脚本compute_event_user_count.pig:

a = LOAD '/data/etl/hive_input/20150613/basis_event_2015061305-r-00001' USING PigStorage('\t');
DESCRIBE a;
b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id;
c = GROUP b BY (event_code, udid);
d = FOREACH c GENERATE group, COUNT($1);
DUMP d;

上面我们是直接将文件路径写死在脚本中,如果需要从外部传递输入文件、输出目录,则可以改写为:

a = LOAD '$input_file' USING PigStorage('\t');
DESCRIBE a;
b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id;
c = GROUP b BY (event_code, udid);
d = FOREACH c GENERATE group, COUNT($1);
STORE d INTO '$output_dir' USING PigStorage ('\t');

则可以执行Pig脚本,并传递参数:

bin/pig -p input_file=/data/etl/hive_input/20150613/basis_event_2015061305-r-00001 -p output_dir=/test/shiyj/pig/example_output -x mapreduce compute_event_user_count.pig

这样就可以实现从外部向Pig脚本传递参数,可以到HDFS上查看结果输出文件/test/shiyj/pig/example_output/part-r-00000。pig命令更多选项,可以查看pig帮助命令:

bin/pig -h
  • 运行Pig脚本出现异常“Retrying connect to server: 0.0.0.0/0.0.0.0:10020”

实际应用中,我们几乎不可能将Pig安装到Hadoop集群的NameNode所在的节点,如果可以安装到NameNode节点上,基本不会报这个错误的。这个错误主要有是由于Pig没有安装在NameNode节点上,而是以外的其它节点上,它在执行计算过程中,需要与MapReduce的JobHistoryServer的IPC服务进行通信,所以在安装Hadoop时需要允许JobHistoryServer的IPC主机和端口被外部其它节点访问,只需要修改etc/hadoop/mapreduce-site.xml配置文件,增加如下配置即可:

<property>
                <name>mapreduce.jobhistory.address</name>
                <value>10.10.4.130:10020</value>
                <description>MapReduce JobHistory Server IPC host:port</description>
        </property>

如果第一次安装Hadoop没有配置该属性mapreduce.jobhistory.address,那么Hadoop集群的所有节点上会使用默认的配置值为0.0.0.0:10020,所以如果不在NameNode上安装Pig程序,导致Pig所在的节点上安装的Hadoop的配置属性mapreduce.jobhistory.address使用默认值,也就是Pig所在节点0.0.0.0:10020,所以Pig脚本就会执行过程中与本机的10020端口通信,显然会失败的。
其实,如果已经在NameNode上启动了JobHistoryServer进程,只需要修改mapreduce.jobhistory.address的属性值,然后同步到所有安装Hadoop文件的节点,包括Pig所在节点即可,不需要重启NameNode节点上的JobHistoryServer进程。如果没有在NameNode上启动JobHistoryServer进程,执行如下命令启动即可:

mr-jobhistory-daemon.sh start historyserver

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>