- 实现一个RecordReader来读取CombineFileSplit包装的文件Block
- 继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类
- 处理数据的Mapper实现类
- 配置用来处理海量小文件的MapReduce Job
- CombineSmallfileRecordReader类
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> { private CombineFileSplit combineFileSplit; private LineRecordReader lineRecordReader = new LineRecordReader(); private Path[] paths; private int totalLength; private int currentIndex; private float currentProgress = 0; private LongWritable currentKey; private BytesWritable currentValue = new BytesWritable();; public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException { super(); this.combineFileSplit = combineFileSplit; this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引 } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.combineFileSplit = (CombineFileSplit) split; // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据 FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations()); lineRecordReader.initialize(fileSplit, context); this.paths = combineFileSplit.getPaths(); totalLength = paths.length; context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName()); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { currentKey = lineRecordReader.getCurrentKey(); return currentKey; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { byte[] content = lineRecordReader.getCurrentValue().getBytes(); currentValue.set(content, 0, content.length); return currentValue; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (currentIndex >= 0 && currentIndex < totalLength) { return lineRecordReader.nextKeyValue(); } else { return false; } } @Override public float getProgress() throws IOException { if (currentIndex >= 0 && currentIndex < totalLength) { currentProgress = (float) currentIndex / totalLength; return currentProgress; } return currentProgress; } @Override public void close() throws IOException { lineRecordReader.close(); } }
- CombineSmallfileInputFormat类
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> { @Override public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit) split; CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class); try { recordReader.initialize(combineFileSplit, context); } catch (InterruptedException e) { new RuntimeException("Error to initialize CombineSmallfileRecordReader."); } return recordReader; } }
上面比较重要的是,一定要通过CombineFileRecordReader来创建一个RecordReader,而且它的构造方法的参数必须是上面的定义的类型和顺序,构造方法包含3个参数:第一个是CombineFileSplit类型,第二个是TaskAttemptContext类型,第三个是Class<? extends RecordReader>类型。
- CombineSmallfileMapper类
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> { private Text file = new Text(); @Override protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { String fileName = context.getConfiguration().get("map.input.file.name"); file.set(fileName); context.write(file, value); } }
- CombineSmallfiles类
下面看我们的主方法入口类,这里面需要配置我之前实现的MapReduce Job,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.shirdrn.kodz.inaction.hadoop.smallfiles.IdentityReducer; public class CombineSmallfiles { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: conbinesmallfiles <in> <out>"); System.exit(2); } conf.setInt("mapred.min.split.size", 1); conf.setLong("mapred.max.split.size", 26214400); // 25m conf.setInt("mapred.reduce.tasks", 5); Job job = new Job(conf, "combine smallfiles"); job.setJarByClass(CombineSmallfiles.class); job.setMapperClass(CombineSmallfileMapper.class); job.setReducerClass(IdentityReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(CombineSmallfileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); int exitFlag = job.waitForCompletion(true) ? 0 : 1; System.exit(exitFlag); } }
下面看一下,我们经过处理后,将小文件合并的结果,从而更利于使用Hadoop MapReduce框架进行高效地计算。
- 准备工作
jar -cvf combine-smallfiles.jar -C ./ org/shirdrn/kodz/inaction/hadoop/smallfiles xiaoxiang@ubuntu3:~$ cd /opt/comodo/cloud/hadoop-1.0.3 xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -mkdir /user/xiaoxiang/datasets/smallfiles xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/comodo/cloud/dataset/smallfiles/* /user/xiaoxiang/datasets/smallfiles
- 运行结果
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop jar combine-smallfiles.jar org.shirdrn.kodz.inaction.hadoop.smallfiles.combine.CombineSmallfiles /user/xiaoxiang/datasets/smallfiles /user/xiaoxiang/output/smallfiles/combine 13/03/23 21:52:09 INFO input.FileInputFormat: Total input paths to process : 117 13/03/23 21:52:09 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/03/23 21:52:09 WARN snappy.LoadSnappy: Snappy native library not loaded 13/03/23 21:52:10 INFO mapred.JobClient: Running job: job_201303111631_0038 13/03/23 21:52:11 INFO mapred.JobClient: map 0% reduce 0% 13/03/23 21:52:29 INFO mapred.JobClient: map 33% reduce 0% 13/03/23 21:52:32 INFO mapred.JobClient: map 55% reduce 0% 13/03/23 21:52:35 INFO mapred.JobClient: map 76% reduce 0% 13/03/23 21:52:38 INFO mapred.JobClient: map 99% reduce 0% 13/03/23 21:52:41 INFO mapred.JobClient: map 100% reduce 0% 13/03/23 21:53:02 INFO mapred.JobClient: map 100% reduce 20% 13/03/23 21:53:05 INFO mapred.JobClient: map 100% reduce 40% 13/03/23 21:53:14 INFO mapred.JobClient: map 100% reduce 60% 13/03/23 21:53:17 INFO mapred.JobClient: map 100% reduce 80% 13/03/23 21:53:32 INFO mapred.JobClient: map 100% reduce 100% 13/03/23 21:53:37 INFO mapred.JobClient: Job complete: job_201303111631_0038 13/03/23 21:53:37 INFO mapred.JobClient: Counters: 28 13/03/23 21:53:37 INFO mapred.JobClient: Job Counters 13/03/23 21:53:37 INFO mapred.JobClient: Launched reduce tasks=5 13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=33515 13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/03/23 21:53:37 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/03/23 21:53:37 INFO mapred.JobClient: Launched map tasks=1 13/03/23 21:53:37 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=69085 13/03/23 21:53:37 INFO mapred.JobClient: File Output Format Counters 13/03/23 21:53:37 INFO mapred.JobClient: Bytes Written=237510415 13/03/23 21:53:37 INFO mapred.JobClient: FileSystemCounters 13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_READ=508266867 13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_READ=147037765 13/03/23 21:53:37 INFO mapred.JobClient: FILE_BYTES_WRITTEN=722417364 13/03/23 21:53:37 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=237510415 13/03/23 21:53:37 INFO mapred.JobClient: File Input Format Counters 13/03/23 21:53:37 INFO mapred.JobClient: Bytes Read=0 13/03/23 21:53:37 INFO mapred.JobClient: Map-Reduce Framework 13/03/23 21:53:37 INFO mapred.JobClient: Map output materialized bytes=214110010 13/03/23 21:53:37 INFO mapred.JobClient: Map input records=3510000 13/03/23 21:53:37 INFO mapred.JobClient: Reduce shuffle bytes=0 13/03/23 21:53:37 INFO mapred.JobClient: Spilled Records=11840717 13/03/23 21:53:37 INFO mapred.JobClient: Map output bytes=207089980 13/03/23 21:53:37 INFO mapred.JobClient: CPU time spent (ms)=64200 13/03/23 21:53:37 INFO mapred.JobClient: Total committed heap usage (bytes)=722665472 13/03/23 21:53:37 INFO mapred.JobClient: Combine input records=0 13/03/23 21:53:37 INFO mapred.JobClient: SPLIT_RAW_BYTES=7914 13/03/23 21:53:37 INFO mapred.JobClient: Reduce input records=3510000 13/03/23 21:53:37 INFO mapred.JobClient: Reduce input groups=117 13/03/23 21:53:37 INFO mapred.JobClient: Combine output records=0 13/03/23 21:53:37 INFO mapred.JobClient: Physical memory (bytes) snapshot=820969472 13/03/23 21:53:37 INFO mapred.JobClient: Reduce output records=3510000 13/03/23 21:53:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3257425920 13/03/23 21:53:37 INFO mapred.JobClient: Map output records=3510000
- 验证结果
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -text /user/xiaoxiang/output/smallfiles/combine/part-r-00000 | head -5 data_50000_000 44 4a 20 32 31 34 34 30 30 39 39 38 37 32 31 36 20 32 31 34 34 30 31 30 30 30 32 30 39 37 20 32 32 31 34 35 32 31 34 35 data_50000_000 44 45 20 32 31 34 34 30 30 39 39 38 37 37 33 32 20 32 31 34 34 30 31 30 30 30 31 32 34 31 20 31 38 32 34 39 37 32 37 34 data_50000_000 42 57 20 32 31 34 34 30 30 39 39 36 39 36 33 30 20 32 31 34 34 30 31 30 30 30 30 33 38 35 20 39 34 35 38 34 39 39 31 37 data_50000_000 50 59 20 32 31 34 34 30 30 39 39 37 37 34 35 34 20 32 31 34 34 30 30 39 39 39 39 35 32 39 20 34 38 37 33 32 33 34 39 37 data_50000_000 4d 4c 20 32 31 34 34 30 30 39 39 37 33 35 35 36 20 32 31 34 34 30 30 39 39 39 38 36 37 33 20 36 33 30 38 36 32 34 36 31 xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -text /user/xiaoxiang/output/smallfiles/combine/part-r-00000 | tail -5 data_50000_230 43 52 20 32 31 34 38 36 36 38 31 36 38 36 38 36 20 32 31 34 38 36 36 38 31 39 35 30 36 38 20 36 39 35 39 38 38 34 30 33 data_50000_230 50 52 20 32 31 34 38 36 36 38 31 36 35 36 34 36 20 32 31 34 38 36 36 38 31 39 34 36 34 30 20 38 34 30 36 35 31 39 38 38 data_50000_230 53 52 20 32 31 34 38 36 36 38 31 36 36 34 38 37 20 32 31 34 38 36 36 38 31 39 34 36 34 30 20 37 39 32 35 36 38 32 38 30 data_50000_230 4d 43 20 32 31 34 38 36 36 38 31 36 39 32 34 32 20 32 31 34 38 36 36 38 31 39 34 32 31 31 20 36 32 33 34 34 38 32 30 30 data_50000_230 4c 49 20 32 31 34 38 36 36 38 31 38 38 38 38 34 20 32 31 34 38 36 36 38 31 39 33 37 38 33 20 32 34 30 30 33 34 36 38 38

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
byte[] content = lineRecordReader.getCurrentValue().getBytes();
currentValue.set(content, 0, content.length);
Returns the raw bytes; however, only data up to getLength() is valid.
