Apache Crunch提供了一套Java API,能够简化编写、测试、运行MapReduce Pipeline程序。Crunch的基本思想是隐藏编写MapReduce程序的细节,基于函数式编程的思想,定义了一套函数式编程接口,因为Java并不支持函数式编程,只能通过回调的方式来实现,虽然写起来代码不够美观简洁,但是编写MapReduce程序的思路是非常清晰的,而且比编写原生的MapReduce程序要容易地多。如果直接使用MapReduce API编写一个复杂的Pipeline程序,可能需要考虑好每个Job的细节(Map和Reduce的实现内容),而使用Crunch变成库来编写,只需要清晰地控制好要实现的业务逻辑处理的操作流程,调用Crunch提供的接口(类似函数操作的算子、如union、join、filter、groupBy、sort等等)。
下面,我们简单说明一下Crunch提供的一些功能或内容:
- Crunch集合及操作
我们看一下Crunch提供的用来在处理分布式数据集的集合类型的抽象定义,如下面类图所示:
上面,我给出了集合类对应的方法签名,其中具有相同名称签名的方法还具有重载的其他方法签名(参数列表不同),Crunch集合类型的高层抽象就包含这3个接口,相关集合子类的实现可以参考Crunch源码。
- 连接操作(Join)
上面类图中,PTable接口中有个一个join方法,这个默认是实现内连接功能(INNER JOIN),Crunch还提供了一个实现各种join操作的工具类,这个类中包含了join相关的静态方法,如下所示:
public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right)
- 排序(Sorting)
排序操作是通过Sort工具来提供的,提供了包含sort相关的静态方法,如下所示:
public static <T> PCollection<T> sort(PCollection<T> collection) public static <T> PCollection<T> sort(PCollection<T> collection, Order order) public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) public static <K, V> PTable<K, V> sort(PTable<K, V> table) public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(Collection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers, ColumnOrder... columnOrders)
可见排序的操作是非常方便的,后面我们会有个例子使用排序的功能。
- 次级排序(SecondarySort)
次级排序是通过SecondarySort工具类来实现的,也是包含了一组静态方法,如下所示:
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers) public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype, int numReducers)
- 去重(Distinct)
工具类Distinct提供的静态方法,如下所示:
public static <S> PCollection<S> distinct(PCollection<S> input) public static <K, V> PTable<K, V> distinct(PTable<K, V> input) public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery)
- 采样(Sampling)
工具类Sample提供了一组静态方法,如下所示:
public static <S> PCollection<S> sample(PCollection<S> input, double probability) public static <S> PCollection<S> sample(PCollection<S> input, Long seed, double probability) public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, double probability) public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize) public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize, Long seed) public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize) public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize, Long seed) public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample( PTable<Integer, Pair<T, N>> input, int[] sampleSizes) public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(PTable<Integer, Pair<T, N>> input, int[] sampleSizes, Long seed)
- 运行原生MapReduce程序
通过Mapreduce和Mapred两个工具类,可以运行我们已经存在的MapReduce程序,静态方法如下所示:
// Mapreduce工具类 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( PTable<K1, V1> input, Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, Class<K2> keyClass, Class<V2> valueClass) public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( PGroupedTable<K1, V1> input, Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, Class<K2> keyClass, Class<V2> valueClass) // Mapred工具类 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( PTable<K1, V1> input, Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, Class<K2> keyClass, Class<V2> valueClass) public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( PGroupedTable<K1, V1> input, Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, Class<K2> keyClass, Class<V2> valueClass)
下面,我们通过实现两个例子,来体验使用Crunch和原生MapReduce API编程的不同。
WordCount计算词频程序
我们要实现的WordCount程序,基于Crunch编程库,按照操作来定义分为如下步骤:
- 从HDFS上读取一个保存Text文件的目录
- 将文件中每行内容根据空格分隔,分成<word, 单个单词词频>对(MapReduce程序的Map阶段)
- 将得到的集合按照key分组
- 化简结果得到每个单词的频率计数<word, 全局词频>(MapReduce程序的Reduce阶段)
- 根据单词全局词频计数,得到降序排序的结果集
- 输出结果到HDFS中
基于上述步骤,根据Crunch编程库实现代码,如下所示:
package org.shirdrn.crunch.examples; import static org.apache.crunch.types.writable.Writables.ints; import static org.apache.crunch.types.writable.Writables.strings; import static org.apache.crunch.types.writable.Writables.tableOf; import java.io.Serializable; import java.util.Iterator; import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Sort; import org.apache.crunch.lib.Sort.ColumnOrder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Strings; public class WordCount extends Configured implements Tool, Serializable { private static final long serialVersionUID = 1L; @Override public int run(String[] args) throws Exception { if(args.length != 2) { System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + WordCount.class.getName() + " <input> <output>"); return 1; } String inputPath = args[0]; String outputPath = args[1]; // Create an pipeline & read a text file Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); PCollection<String> lines = pipeline.readTextFile(inputPath); // map PTable<String, Integer> mappedWords = map(lines); // group by key PGroupedTable<String, Integer> groupedWords = mappedWords.groupByKey(); // reduce PTable<String, Integer> reducedWords = reduce(groupedWords); // sort PCollection<Pair<String, Integer>> sortedValues = Sort.sortPairs(reducedWords, ColumnOrder.by(2, Sort.Order.DESCENDING)); // write the result to a text file pipeline.writeTextFile(sortedValues, outputPath); // Execute the pipeline as a MapReduce PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; } private final PTable<String, Integer> map(PCollection<String> lines) { PTable<String, Integer> mappedWords = lines.parallelDo(new DoFn<String, Pair<String, Integer>>() { private static final long serialVersionUID = 1L; private static final String PATTERN = "\\s+"; @Override public void process(String input, Emitter<Pair<String, Integer>> emitter) { if(!Strings.isNullOrEmpty(input)) { for(String word : input.split(PATTERN)) { if(!Strings.isNullOrEmpty(word)) { emitter.emit(Pair.of(word, 1)); } } } } }, tableOf(strings(), ints())); return mappedWords; } private final PTable<String, Integer> reduce(PGroupedTable<String, Integer> groupedWords) { PTable<String, Integer> reducedWords = groupedWords.combineValues(new CombineFn<String, Integer>() { private static final long serialVersionUID = 1L; @Override public void process(Pair<String, Iterable<Integer>> values, Emitter<Pair<String, Integer>> emitter) { int count = 0; Iterator<Integer> iter = values.second().iterator(); while(iter.hasNext()) { count += iter.next(); } emitter.emit(Pair.of(values.first(), count)); } }); return reducedWords; } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); } }
上述代码中,可以在run方法中看到在完成一个计算任务的过程中,有序步骤序列中的每一步需要做什么都非常明确,而如果使用MapReduce API编写,我们可能会完全陷入API的使用方法中,而对整体执行流程没有一个更加直观的印象,尤其是对接触MapReduce不久的开发人员来说,更是难以理解。
运行程序和使用MapReduce API编写的程序使用相同的命令格式,下面运行我们基于Crunch编写的程序,执行如下命令:
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.WordCount /data/crunch/in /data/crunch/out
控制台运行结果信息,示例如下所示:
15/03/06 15:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/06 15:48:45 INFO impl.FileTargetImpl: Will write output files to new path: /data/crunch/out 15/03/06 15:48:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 15/03/06 15:48:46 INFO collect.PGroupedTableImpl: Setting num reduce tasks to 2 15/03/06 15:48:46 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032 15/03/06 15:48:47 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize 15/03/06 15:48:47 INFO input.FileInputFormat: Total input paths to process : 1 15/03/06 15:48:47 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 2, size left: 51480009 15/03/06 15:48:47 INFO mapreduce.JobSubmitter: number of splits:27 15/03/06 15:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0012 15/03/06 15:48:48 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0012 15/03/06 15:48:48 INFO mapreduce.Job: The url to track the job: http://h1:8088/proxy/application_1422867923292_0012/ 15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: Text(/data/crunch/in)+S0+GBK+combine+S1+SeqFile(/tmp/crun... ID=1 (1/2)" 15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Job status available at: http://h1:8088/proxy/application_1422867923292_0012/ 15/03/06 15:54:57 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032 15/03/06 15:54:58 INFO input.FileInputFormat: Total input paths to process : 2 15/03/06 15:54:58 INFO mapreduce.JobSubmitter: number of splits:24 15/03/06 15:54:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0013 15/03/06 15:54:58 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0013 15/03/06 15:54:58 INFO mapreduce.Job: The url to track the job: http://h1:8088/proxy/application_1422867923292_0013/ 15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: SeqFile(/tmp/crunch-1592377036/p1)+GBK+ungroup+PTables.va... ID=2 (2/2)" 15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Job status available at: http://h1:8088/proxy/application_1422867923292_0013/
上面,我们基于Crunch编程库实现的WordCount程序,最终在Hadoop集群上运行时,被转换成2个Job(application_1422867923292_0012和application_1422867923292_0013),也可以通过YARN Application页面,如图所示:
运行成功后,可以通过命令查看程序执行结果,内容如下所示:
[hadoop@h1 ~]$ hdfs dfs -cat /data/crunch/out/part-r-00000 | head 15/03/06 16:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [0,31498202] [2.3.14,6696801] [1,4099030] [2.3.3,3364573] [2.2.1,3204483] [appstore,2896280] [2.1.1,2664336] [3.0.3,2246132] [2.3.9,1669198] [3.0.5,1596954]
可见,程序运行结果符合我们设计程序的预期。
表join操作程序
使用MapReduce API编写join程序时,非常复杂,尤其是如果对MapReduce执行的原理理解不是很深刻时,实现程序可能会有一点困难,而且完成后可能也不是很直观。下面,我们使用Crunch API来实现一个表join的例子,两个表做连接,首先准备的数据文件如下所示:
- 用户信息表
用户信息表对应的文件为user_info.txt,这是来自手机应用程序的,包含两个字段,第一个是用户编号,第二个是做推广的渠道来源,中间使用TAB键分隔,文件行内容,示例如下所示:
86756ed5cccd69494f6b94fc31638835 c9049310 c4b0657cf93fc4f05c87a76077e50090 c0000341 d5c33ce0a110ca3e2def912506bcd846 c9049310 7ed745aa273b67b7e9189256140c7220 c0087001 d31908a762c49c68826f9f5addb345cb c0000106 dd3ced7cf15ba397f84eb3389b6ffc11 c0049146 c60bc23264e61e3ba553f6127138c77a c6201204 a892a2001947d872435edddad44da7d7 c9049310 d31908a762c49c68826f9f5addb345cb c9001500 dd3ced7cf15ba397f84eb3389b6ffc11 c6820012 c60bc23264e61e3ba553f6127138c77a c0055102 a892a2001947d872435edddad44da7d7 c9049310 847383774f24b1904a9ada6af7028f52 c0049146 443f9f5c9f8c16a8a676bb567451a65f c6820012 a6b88a8ad016af214ba30aef03b450eb c0055102 f57cab9d9712e9884a5f4a373af8b220 c0055102 3762865e195e7885e41fe748671657c2 c0049146 34323bfbfe3ea8561952769af0bc117f c9000541 9b4e112db554d44ae5c8140a40f7d389 c0049146 f26fd561ab357cf2aad041eaa17c30b4 c0055102 2812d7c1641580784b19f7ef1b1fdbf2 c6820012 7ed745aa273b67b7e9189256140c7220 c9049310 19bacba14597d2e8994ce50a5d47c2b4 c0055102 B9BEDD6A957377FDEA2A78D4B614531A c9000201 7aee01193033491f12342a7b9dbbe4a3 c0000106 77b80cfc95b04ba427412ae5fc276d6f c0055102 e1f7c264271e1ee2e2a78d0798354d89 c6820012 68aeafdfd07e85b41faafb33fc63600c c0055102 556BB4023B23D4C8DD70E81FDD270121 c0049146 d31908a762c49c68826f9f5addb345cb c0055102 14b769541197dc391767ddbce878393b c0055102 c60bc23264e61e3ba553f6127138c77a c0000106 2708bb4bb8f967c607bc82749d892ebf c0055102 70F9442DAE5539ED99E280F12AFA4EC1 c9049310
- 渠道信息表
渠道信息表对应的文件为channel_info.txt,文件行内容包含两个字段,第一个是渠道编号,第二个是渠道名称,中间使用TAB键分隔,示例文件行内容如下所示:
c0000341 Anet c9049310 Bwater c0087001 cks005 c0000106 bjrow c6201204 nabEer c9001500 moppppy c0055102 susu c0049146 pwest c6820012 9square c9000541 house8 c9000201 netfyy
由于用户表只保存了渠道编号,在出报表的时候需要将渠道名称显示出来,我们需要将两个表基于渠道编号做一个连接,就能在报表中展示用户所在的渠道名称。
使用Crunch实现这个需求时,非常简单明了,代码如下所示:
package org.shirdrn.crunch.examples; import java.io.Serializable; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Join; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JoinUserChannel extends Configured implements Tool, Serializable { private static final long serialVersionUID = 1L; @Override public int run(String[] args) throws Exception { if(args.length != 3) { System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + JoinUserChannel.class.getName() + " <user_input> <channel_input> <output>"); return 1; } String userPath = args[0]; String channelPath = args[1]; String outputPath = args[2]; // Create an pipeline & read 2 text files Pipeline pipeline = new MRPipeline(JoinUserChannel.class, getConf()); // user data PCollection<String> users = pipeline.readTextFile(userPath); PTypeFamily uTF = users.getTypeFamily(); PTable<String, String> left = users.parallelDo(new DoFn<String, Pair<String, String>>() { private static final long serialVersionUID = 1L; @Override public void process(String input, Emitter<Pair<String, String>> emitter) { String[] kv = input.split("\\s+"); if(kv.length == 2) { String userId = kv[0]; String channelId = kv[1].trim(); emitter.emit(Pair.of(channelId, userId)); // key=channelId, value=userId } } }, uTF.tableOf(uTF.strings(), uTF.strings())); // channel data PCollection<String> channels = pipeline.readTextFile(channelPath); PTypeFamily cTF = channels.getTypeFamily(); PTable<String, String> right = channels.parallelDo(new DoFn<String, Pair<String, String>>() { private static final long serialVersionUID = 1L; @Override public void process(String input, Emitter<Pair<String, String>> emitter) { String[] kv = input.split("\\s+"); if(kv.length == 2) { String channelId = kv[0].trim(); String channelName = kv[1]; emitter.emit(Pair.of(channelId, channelName)); // key=channelId, value=channelName } } }, cTF.tableOf(cTF.strings(), cTF.strings())); // join 2 tables & write to HDFS PTable<String, Pair<String, String>> joinedResult = Join.innerJoin(left, right); pipeline.writeTextFile(joinedResult, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new JoinUserChannel(), args); } }
首先分别将两个文件的数据读取到PCollection<String>集合对象中,返回PTable<String, String>集合对象,然后通过Join工具类调用innerJoin方法实现内连接操作,返回PTable<String, Pair<String, String>>,最后写入HDFS文件系统中。进行表join操作时,如果是内连接,也可以直接调用PTable<String, String>集合的join方法,与调用Join工具类的innerJoin和join方法功能相同。
运行上述程序,执行如下命令:
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.JoinUserChannel /data/crunch/user/user_info.txt /data/crunch/user/channel_info.txt /data/crunch/user/joined
查看运行结果,如下所示:
[hadoop@h1 crunch]$ hdfs dfs -ls /data/crunch/user/joined 15/03/06 18:53:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 3 hadoop supergroup 0 2015-03-06 18:53 /data/crunch/user/joined/_SUCCESS -rw-r--r-- 3 hadoop supergroup 1777 2015-03-06 18:53 /data/crunch/user/joined/part-r-00000 [hadoop@h1 crunch]$ hdfs dfs -cat /data/crunch/user/joined/part-r-00000 | head 15/03/06 18:54:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [c0000106,[7aee01193033491f12342a7b9dbbe4a3,bjrow]] [c0000106,[c60bc23264e61e3ba553f6127138c77a,bjrow]] [c0000106,[d31908a762c49c68826f9f5addb345cb,bjrow]] [c0000341,[c4b0657cf93fc4f05c87a76077e50090,Anet]] [c0049146,[3762865e195e7885e41fe748671657c2,pwest]] [c0049146,[556BB4023B23D4C8DD70E81FDD270121,pwest]] [c0049146,[9b4e112db554d44ae5c8140a40f7d389,pwest]] [c0049146,[dd3ced7cf15ba397f84eb3389b6ffc11,pwest]] [c0049146,[847383774f24b1904a9ada6af7028f52,pwest]] [c0055102,[f26fd561ab357cf2aad041eaa17c30b4,susu]]
总结说明
上面的两个例子只是简单使用Crunch开发了两MapReduce程序,无论对有经验的开发人员还是新手,都很容易上手开发。
目前,Crunch还处在项目初期,最新版本是0.11,当前只在Hadoop的2.2.0以及以下几个平台下测试过,如果在高于2.2.0的版本的平台上运行,可能会有若干一些小问题或不完善之处。上面我们开发的例子程序,就是运行在Hadoop 2.6.0平台上,比如在读取提交程序的节点上的Hadoop配置文件中block size配置时,不支持高版本的类似64m、2g等的配置内容,而必须使用数字类型表示的字节数配置。
Crunch后续开发迭代中一定会使各项功能更加完善,API也更加简洁,尤其是在程序运行调优方面,Crunch的目标是做到几乎不需要进行复杂的调优配置就能够使程序高效地运行,非常期待。
另外,Crunch也提供了对Spark计算平台的支持,想要了解与Spark计算相关的内容,可以参考官网文档。
参考链接
- http://crunch.apache.org/
- http://crunch.apache.org/getting-started.html
- http://crunch.apache.org/user-guide.html
- http://www.ashishpaliwal.com/blog/2012/11/crunching-data-with-apache-crunch-part-4/
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。