Apache Crunch:简化编写MapReduce Pipeline程序

Apache Crunch提供了一套Java API,能够简化编写、测试、运行MapReduce Pipeline程序。Crunch的基本思想是隐藏编写MapReduce程序的细节,基于函数式编程的思想,定义了一套函数式编程接口,因为Java并不支持函数式编程,只能通过回调的方式来实现,虽然写起来代码不够美观简洁,但是编写MapReduce程序的思路是非常清晰的,而且比编写原生的MapReduce程序要容易地多。如果直接使用MapReduce API编写一个复杂的Pipeline程序,可能需要考虑好每个Job的细节(Map和Reduce的实现内容),而使用Crunch变成库来编写,只需要清晰地控制好要实现的业务逻辑处理的操作流程,调用Crunch提供的接口(类似函数操作的算子、如union、join、filter、groupBy、sort等等)。
下面,我们简单说明一下Crunch提供的一些功能或内容:

  • Crunch集合及操作

我们看一下Crunch提供的用来在处理分布式数据集的集合类型的抽象定义,如下面类图所示:
crunch-collection-abstraction
上面,我给出了集合类对应的方法签名,其中具有相同名称签名的方法还具有重载的其他方法签名(参数列表不同),Crunch集合类型的高层抽象就包含这3个接口,相关集合子类的实现可以参考Crunch源码。

  • 连接操作(Join)

上面类图中,PTable接口中有个一个join方法,这个默认是实现内连接功能(INNER JOIN),Crunch还提供了一个实现各种join操作的工具类,这个类中包含了join相关的静态方法,如下所示:

public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right)
public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right)
public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right)
public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right)
public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right)
  • 排序(Sorting)

排序操作是通过Sort工具来提供的,提供了包含sort相关的静态方法,如下所示:

public static <T> PCollection<T> sort(PCollection<T> collection) 
public static <T> PCollection<T> sort(PCollection<T> collection, Order order)
public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order)
public static <K, V> PTable<K, V> sort(PTable<K, V> table)
public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key)
public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key)
public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders)
public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders)
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders)
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(Collection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders)
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders)
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers, ColumnOrder... columnOrders)

可见排序的操作是非常方便的,后面我们会有个例子使用排序的功能。

  • 次级排序(SecondarySort)

次级排序是通过SecondarySort工具类来实现的,也是包含了一组静态方法,如下所示:

public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype)
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers)
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype)
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype, int numReducers)
  • 去重(Distinct)

工具类Distinct提供的静态方法,如下所示:

public static <S> PCollection<S> distinct(PCollection<S> input)
public static <K, V> PTable<K, V> distinct(PTable<K, V> input)
public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery)
public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery)
  • 采样(Sampling)

工具类Sample提供了一组静态方法,如下所示:

public static <S> PCollection<S> sample(PCollection<S> input, double probability)
public static <S> PCollection<S> sample(PCollection<S> input, Long seed, double probability)
public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability)
public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, double probability)
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize)
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize, Long seed)
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize)
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize, Long seed)
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample( PTable<Integer, Pair<T, N>> input, int[] sampleSizes)
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(PTable<Integer, Pair<T, N>> input, int[] sampleSizes, Long seed)
  • 运行原生MapReduce程序

通过Mapreduce和Mapred两个工具类,可以运行我们已经存在的MapReduce程序,静态方法如下所示:

// Mapreduce工具类
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(
      PTable<K1, V1> input,
      Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
      Class<K2> keyClass, Class<V2> valueClass)
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(
      PGroupedTable<K1, V1> input,
      Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
          Class<K2> keyClass, Class<V2> valueClass)

// Mapred工具类
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(
      PTable<K1, V1> input,
      Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
      Class<K2> keyClass, Class<V2> valueClass)
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(
      PGroupedTable<K1, V1> input,
      Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
          Class<K2> keyClass, Class<V2> valueClass)

下面,我们通过实现两个例子,来体验使用Crunch和原生MapReduce API编程的不同。

WordCount计算词频程序

我们要实现的WordCount程序,基于Crunch编程库,按照操作来定义分为如下步骤:

  1. 从HDFS上读取一个保存Text文件的目录
  2. 将文件中每行内容根据空格分隔,分成<word, 单个单词词频>对(MapReduce程序的Map阶段)
  3. 将得到的集合按照key分组
  4. 化简结果得到每个单词的频率计数<word, 全局词频>(MapReduce程序的Reduce阶段)
  5. 根据单词全局词频计数,得到降序排序的结果集
  6. 输出结果到HDFS中

基于上述步骤,根据Crunch编程库实现代码,如下所示:

package org.shirdrn.crunch.examples;

import static org.apache.crunch.types.writable.Writables.ints;
import static org.apache.crunch.types.writable.Writables.strings;
import static org.apache.crunch.types.writable.Writables.tableOf;

import java.io.Serializable;
import java.util.Iterator;

import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Sort;
import org.apache.crunch.lib.Sort.ColumnOrder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.google.common.base.Strings;

public class WordCount extends Configured implements Tool, Serializable {

     private static final long serialVersionUID = 1L;

     @Override
     public int run(String[] args) throws Exception {
          if(args.length != 2) {
               System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" +
                         WordCount.class.getName() + " <input> <output>");
               return 1;
          }

          String inputPath = args[0];
          String outputPath = args[1];

          // Create an pipeline & read a text file
          Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
          PCollection<String> lines = pipeline.readTextFile(inputPath);

          // map
          PTable<String, Integer> mappedWords = map(lines);
         
          // group by key
          PGroupedTable<String, Integer> groupedWords = mappedWords.groupByKey();
         
          // reduce
          PTable<String, Integer> reducedWords = reduce(groupedWords);
         
          // sort
          PCollection<Pair<String, Integer>> sortedValues =
                    Sort.sortPairs(reducedWords, ColumnOrder.by(2, Sort.Order.DESCENDING));

          // write the result to a text file
          pipeline.writeTextFile(sortedValues, outputPath);

          // Execute the pipeline as a MapReduce
          PipelineResult result = pipeline.done();
          return result.succeeded() ? 0 : 1;
     }
    
     private final PTable<String, Integer> map(PCollection<String> lines) {
          PTable<String, Integer> mappedWords = lines.parallelDo(new DoFn<String, Pair<String, Integer>>() {
               private static final long serialVersionUID = 1L;
               private static final String PATTERN = "\\s+";
               @Override
               public void process(String input, Emitter<Pair<String, Integer>> emitter) {
                    if(!Strings.isNullOrEmpty(input)) {
                         for(String word : input.split(PATTERN)) {
                              if(!Strings.isNullOrEmpty(word)) {
                                   emitter.emit(Pair.of(word, 1));
                              }
                         }
                    }                   
               }
          }, tableOf(strings(), ints()));
          return mappedWords;
     }

     private final PTable<String, Integer> reduce(PGroupedTable<String, Integer> groupedWords) {
          PTable<String, Integer> reducedWords = groupedWords.combineValues(new CombineFn<String, Integer>() {
               private static final long serialVersionUID = 1L;
               @Override
               public void process(Pair<String, Iterable<Integer>> values, Emitter<Pair<String, Integer>> emitter) {
                    int count = 0;
                    Iterator<Integer> iter = values.second().iterator();
                    while(iter.hasNext()) {
                         count += iter.next();
                    }
                    emitter.emit(Pair.of(values.first(), count));
               }
          });
          return reducedWords;
     }

     public static void main(String[] args) throws Exception {
          ToolRunner.run(new Configuration(), new WordCount(), args);
     }

}

上述代码中,可以在run方法中看到在完成一个计算任务的过程中,有序步骤序列中的每一步需要做什么都非常明确,而如果使用MapReduce API编写,我们可能会完全陷入API的使用方法中,而对整体执行流程没有一个更加直观的印象,尤其是对接触MapReduce不久的开发人员来说,更是难以理解。
运行程序和使用MapReduce API编写的程序使用相同的命令格式,下面运行我们基于Crunch编写的程序,执行如下命令:

hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.WordCount /data/crunch/in /data/crunch/out

控制台运行结果信息,示例如下所示:

15/03/06 15:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/06 15:48:45 INFO impl.FileTargetImpl: Will write output files to new path: /data/crunch/out
15/03/06 15:48:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
15/03/06 15:48:46 INFO collect.PGroupedTableImpl: Setting num reduce tasks to 2
15/03/06 15:48:46 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032
15/03/06 15:48:47 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize
15/03/06 15:48:47 INFO input.FileInputFormat: Total input paths to process : 1
15/03/06 15:48:47 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 2, size left: 51480009
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: number of splits:27
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0012
15/03/06 15:48:48 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0012
15/03/06 15:48:48 INFO mapreduce.Job: The url to track the job: http://h1:8088/proxy/application_1422867923292_0012/
15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: Text(/data/crunch/in)+S0+GBK+combine+S1+SeqFile(/tmp/crun... ID=1 (1/2)"
15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Job status available at: http://h1:8088/proxy/application_1422867923292_0012/
15/03/06 15:54:57 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032
15/03/06 15:54:58 INFO input.FileInputFormat: Total input paths to process : 2
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: number of splits:24
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0013
15/03/06 15:54:58 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0013
15/03/06 15:54:58 INFO mapreduce.Job: The url to track the job: http://h1:8088/proxy/application_1422867923292_0013/
15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: SeqFile(/tmp/crunch-1592377036/p1)+GBK+ungroup+PTables.va... ID=2 (2/2)"
15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Job status available at: http://h1:8088/proxy/application_1422867923292_0013/

上面,我们基于Crunch编程库实现的WordCount程序,最终在Hadoop集群上运行时,被转换成2个Job(application_1422867923292_0012和application_1422867923292_0013),也可以通过YARN Application页面,如图所示:
wordcount-mr-applications
运行成功后,可以通过命令查看程序执行结果,内容如下所示:

[hadoop@h1 ~]$ hdfs dfs -cat /data/crunch/out/part-r-00000 | head
15/03/06 16:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[0,31498202]
[2.3.14,6696801]
[1,4099030]
[2.3.3,3364573]
[2.2.1,3204483]
[appstore,2896280]
[2.1.1,2664336]
[3.0.3,2246132]
[2.3.9,1669198]
[3.0.5,1596954]

可见,程序运行结果符合我们设计程序的预期。

表join操作程序

使用MapReduce API编写join程序时,非常复杂,尤其是如果对MapReduce执行的原理理解不是很深刻时,实现程序可能会有一点困难,而且完成后可能也不是很直观。下面,我们使用Crunch API来实现一个表join的例子,两个表做连接,首先准备的数据文件如下所示:

  • 用户信息表

用户信息表对应的文件为user_info.txt,这是来自手机应用程序的,包含两个字段,第一个是用户编号,第二个是做推广的渠道来源,中间使用TAB键分隔,文件行内容,示例如下所示:

86756ed5cccd69494f6b94fc31638835        c9049310
c4b0657cf93fc4f05c87a76077e50090        c0000341
d5c33ce0a110ca3e2def912506bcd846        c9049310
7ed745aa273b67b7e9189256140c7220        c0087001
d31908a762c49c68826f9f5addb345cb        c0000106
dd3ced7cf15ba397f84eb3389b6ffc11        c0049146
c60bc23264e61e3ba553f6127138c77a        c6201204
a892a2001947d872435edddad44da7d7        c9049310
d31908a762c49c68826f9f5addb345cb        c9001500
dd3ced7cf15ba397f84eb3389b6ffc11        c6820012
c60bc23264e61e3ba553f6127138c77a        c0055102
a892a2001947d872435edddad44da7d7        c9049310
847383774f24b1904a9ada6af7028f52        c0049146
443f9f5c9f8c16a8a676bb567451a65f        c6820012
a6b88a8ad016af214ba30aef03b450eb        c0055102
f57cab9d9712e9884a5f4a373af8b220        c0055102
3762865e195e7885e41fe748671657c2        c0049146
34323bfbfe3ea8561952769af0bc117f        c9000541
9b4e112db554d44ae5c8140a40f7d389        c0049146
f26fd561ab357cf2aad041eaa17c30b4        c0055102
2812d7c1641580784b19f7ef1b1fdbf2        c6820012
7ed745aa273b67b7e9189256140c7220        c9049310
19bacba14597d2e8994ce50a5d47c2b4        c0055102
B9BEDD6A957377FDEA2A78D4B614531A        c9000201
7aee01193033491f12342a7b9dbbe4a3        c0000106
77b80cfc95b04ba427412ae5fc276d6f        c0055102
e1f7c264271e1ee2e2a78d0798354d89        c6820012
68aeafdfd07e85b41faafb33fc63600c        c0055102
556BB4023B23D4C8DD70E81FDD270121        c0049146
d31908a762c49c68826f9f5addb345cb        c0055102
14b769541197dc391767ddbce878393b        c0055102
c60bc23264e61e3ba553f6127138c77a        c0000106
2708bb4bb8f967c607bc82749d892ebf        c0055102
70F9442DAE5539ED99E280F12AFA4EC1        c9049310
  • 渠道信息表

渠道信息表对应的文件为channel_info.txt,文件行内容包含两个字段,第一个是渠道编号,第二个是渠道名称,中间使用TAB键分隔,示例文件行内容如下所示:

c0000341        Anet
c9049310        Bwater
c0087001        cks005
c0000106        bjrow
c6201204        nabEer
c9001500        moppppy
c0055102        susu
c0049146        pwest
c6820012        9square
c9000541        house8
c9000201        netfyy

由于用户表只保存了渠道编号,在出报表的时候需要将渠道名称显示出来,我们需要将两个表基于渠道编号做一个连接,就能在报表中展示用户所在的渠道名称。
使用Crunch实现这个需求时,非常简单明了,代码如下所示:

package org.shirdrn.crunch.examples;

import java.io.Serializable;

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Join;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinUserChannel extends Configured implements Tool, Serializable {

     private static final long serialVersionUID = 1L;

     @Override
     public int run(String[] args) throws Exception {
          if(args.length != 3) {
               System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" +
                         JoinUserChannel.class.getName() + " <user_input> <channel_input> <output>");
               return 1;
          }

          String userPath = args[0];
          String channelPath = args[1];
          String outputPath = args[2];

          // Create an pipeline & read 2 text files
          Pipeline pipeline = new MRPipeline(JoinUserChannel.class, getConf());
         
          // user data
          PCollection<String> users = pipeline.readTextFile(userPath);
          PTypeFamily uTF = users.getTypeFamily();
          PTable<String, String> left = users.parallelDo(new DoFn<String, Pair<String, String>>() {
               private static final long serialVersionUID = 1L;
               @Override
               public void process(String input, Emitter<Pair<String, String>> emitter) {
                    String[] kv = input.split("\\s+");
                    if(kv.length == 2) {
                         String userId = kv[0];
                         String channelId = kv[1].trim();
                         emitter.emit(Pair.of(channelId, userId)); // key=channelId, value=userId
                    }
               }
              
          }, uTF.tableOf(uTF.strings(), uTF.strings()));
         
          // channel data
          PCollection<String> channels = pipeline.readTextFile(channelPath);
          PTypeFamily cTF = channels.getTypeFamily();
          PTable<String, String> right = channels.parallelDo(new DoFn<String, Pair<String, String>>() {
               private static final long serialVersionUID = 1L;
               @Override
               public void process(String input, Emitter<Pair<String, String>> emitter) {
                    String[] kv = input.split("\\s+");
                    if(kv.length == 2) {
                         String channelId = kv[0].trim();
                         String channelName = kv[1];
                         emitter.emit(Pair.of(channelId, channelName)); // key=channelId, value=channelName
                    }
               }
              
          }, cTF.tableOf(cTF.strings(), cTF.strings()));
         
          // join 2 tables & write to HDFS
          PTable<String, Pair<String, String>> joinedResult = Join.innerJoin(left, right);
          pipeline.writeTextFile(joinedResult, outputPath);
         
          // Execute the pipeline as a MapReduce.
          PipelineResult result = pipeline.done();
          return result.succeeded() ? 0 : 1;
     }
    
     public static void main(String[] args) throws Exception {
          ToolRunner.run(new Configuration(), new JoinUserChannel(), args);
         
     }

}

首先分别将两个文件的数据读取到PCollection<String>集合对象中,返回PTable<String, String>集合对象,然后通过Join工具类调用innerJoin方法实现内连接操作,返回PTable<String, Pair<String, String>>,最后写入HDFS文件系统中。进行表join操作时,如果是内连接,也可以直接调用PTable<String, String>集合的join方法,与调用Join工具类的innerJoin和join方法功能相同。
运行上述程序,执行如下命令:

hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.JoinUserChannel /data/crunch/user/user_info.txt /data/crunch/user/channel_info.txt /data/crunch/user/joined

查看运行结果,如下所示:

[hadoop@h1 crunch]$ hdfs dfs -ls /data/crunch/user/joined
15/03/06 18:53:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2015-03-06 18:53 /data/crunch/user/joined/_SUCCESS
-rw-r--r--   3 hadoop supergroup       1777 2015-03-06 18:53 /data/crunch/user/joined/part-r-00000
[hadoop@h1 crunch]$ hdfs dfs -cat /data/crunch/user/joined/part-r-00000 | head
15/03/06 18:54:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[c0000106,[7aee01193033491f12342a7b9dbbe4a3,bjrow]]
[c0000106,[c60bc23264e61e3ba553f6127138c77a,bjrow]]
[c0000106,[d31908a762c49c68826f9f5addb345cb,bjrow]]
[c0000341,[c4b0657cf93fc4f05c87a76077e50090,Anet]]
[c0049146,[3762865e195e7885e41fe748671657c2,pwest]]
[c0049146,[556BB4023B23D4C8DD70E81FDD270121,pwest]]
[c0049146,[9b4e112db554d44ae5c8140a40f7d389,pwest]]
[c0049146,[dd3ced7cf15ba397f84eb3389b6ffc11,pwest]]
[c0049146,[847383774f24b1904a9ada6af7028f52,pwest]]
[c0055102,[f26fd561ab357cf2aad041eaa17c30b4,susu]]

总结说明

上面的两个例子只是简单使用Crunch开发了两MapReduce程序,无论对有经验的开发人员还是新手,都很容易上手开发。
目前,Crunch还处在项目初期,最新版本是0.11,当前只在Hadoop的2.2.0以及以下几个平台下测试过,如果在高于2.2.0的版本的平台上运行,可能会有若干一些小问题或不完善之处。上面我们开发的例子程序,就是运行在Hadoop 2.6.0平台上,比如在读取提交程序的节点上的Hadoop配置文件中block size配置时,不支持高版本的类似64m、2g等的配置内容,而必须使用数字类型表示的字节数配置。
Crunch后续开发迭代中一定会使各项功能更加完善,API也更加简洁,尤其是在程序运行调优方面,Crunch的目标是做到几乎不需要进行复杂的调优配置就能够使程序高效地运行,非常期待。
另外,Crunch也提供了对Spark计算平台的支持,想要了解与Spark计算相关的内容,可以参考官网文档。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>