Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat

在使用Hadoop处理海量小文件的应用场景中,如果你选择使用CombineFileInputFormat,而且你是第一次使用,可能你会感到有点迷惑。虽然,从这个处理方案的思想上很容易理解,但是可能会遇到这样那样的问题。
使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。
CombineFileInputFormat的大致原理是,他会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。如果想要处理一个CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一个FileInputSplit对象)。
在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。那么对于CombineFileSplit来说,你需要处理其包含的小文件Block,就要对应设置一个RecordReader,才能正确读取文件数据内容。通常情况下,我们有一批小文件,格式通常是相同的,只需要在为CombineFileSplit实现一个RecordReader的时候,内置另一个用来读取小文件Block的RecordReader,这样就能保证读取CombineFileSplit内部聚积的小文件。

编程实现

通过上面的说明,我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作就很显然了,如下所示:

  1. 实现一个RecordReader来读取CombineFileSplit包装的文件Block
  2. 继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类
  3. 处理数据的Mapper实现类
  4. 配置用来处理海量小文件的MapReduce Job

下面,对编程实现的过程,详细讲解:

  • CombineSmallfileRecordReader类

为CombineFileSplit实现一个RecordReader,并在内部使用Hadoop自带的LineRecordReader来读取小文件的文本行数据,代码实现如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {

	private CombineFileSplit combineFileSplit;
	private LineRecordReader lineRecordReader = new LineRecordReader();
	private Path[] paths;
	private int totalLength;
	private int currentIndex;
	private float currentProgress = 0;
	private LongWritable currentKey;
	private BytesWritable currentValue = new BytesWritable();;

	public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
		super();
		this.combineFileSplit = combineFileSplit;
		this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
	}

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		this.combineFileSplit = (CombineFileSplit) split;
		// 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
		FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
		lineRecordReader.initialize(fileSplit, context);

		this.paths = combineFileSplit.getPaths();
		totalLength = paths.length;
		context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
	}

	@Override
	public LongWritable getCurrentKey() throws IOException, InterruptedException {
		currentKey = lineRecordReader.getCurrentKey();
		return currentKey;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		byte[] content = lineRecordReader.getCurrentValue().getBytes();
		currentValue.set(content, 0, content.length);
		return currentValue;
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (currentIndex >= 0 && currentIndex < totalLength) {
			return lineRecordReader.nextKeyValue();
		} else {
			return false;
		}
	}

	@Override
	public float getProgress() throws IOException {
		if (currentIndex >= 0 && currentIndex < totalLength) {
			currentProgress = (float) currentIndex / totalLength;
			return currentProgress;
		}
		return currentProgress;
	}

	@Override
	public void close() throws IOException {
		lineRecordReader.close();
	}
}

如果存在这样的应用场景,你的小文件具有不同的格式,那么久需要考虑对不同类型的小文件,使用不同的内置RecordReader,具体逻辑也是在上面的类中实现。

  • CombineSmallfileInputFormat类

我们已经为CombineFileSplit实现了一个RecordReader,然后需要在一个CombineFileInputFormat中注入这个RecordReader类实现类CombineSmallfileRecordReader的对象。这时,需要实现一个CombineFileInputFormat的子类,可以重写createRecordReader方法。我们实现的CombineSmallfileInputFormat,代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {

	@Override
	public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

		CombineFileSplit combineFileSplit = (CombineFileSplit) split;
		CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
		try {
			recordReader.initialize(combineFileSplit, context);
		} catch (InterruptedException e) {
			new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
		}
		return recordReader;
	}

}

上面比较重要的是,一定要通过CombineFileRecordReader来创建一个RecordReader,而且它的构造方法的参数必须是上面的定义的类型和顺序,构造方法包含3个参数:第一个是CombineFileSplit类型,第二个是TaskAttemptContext类型,第三个是Class<? extends RecordReader>类型。

  • CombineSmallfileMapper类

下面,我们实现我们的MapReduce任务实现类,CombineSmallfileMapper类代码,如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {

	private Text file = new Text();

	@Override
	protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
		String fileName = context.getConfiguration().get("map.input.file.name");
		file.set(fileName);
		context.write(file, value);
	}

}

比较简单,就是将输入的文件文本行拆分成键值对,然后输出。

  • CombineSmallfiles类

下面看我们的主方法入口类,这里面需要配置我之前实现的MapReduce Job,实现代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.smallfiles.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.shirdrn.kodz.inaction.hadoop.smallfiles.IdentityReducer;

public class CombineSmallfiles {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: conbinesmallfiles <in> <out>");
			System.exit(2);
		}

		conf.setInt("mapred.min.split.size", 1);
		conf.setLong("mapred.max.split.size", 26214400); // 25m

		conf.setInt("mapred.reduce.tasks", 5);

		Job job = new Job(conf, "combine smallfiles");
		job.setJarByClass(CombineSmallfiles.class);
		job.setMapperClass(CombineSmallfileMapper.class);
		job.setReducerClass(IdentityReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);

		job.setInputFormatClass(CombineSmallfileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		int exitFlag = job.waitForCompletion(true) ? 0 : 1;
		System.exit(exitFlag);

	}

}

运行程序

下面看一下,我们经过处理后,将小文件合并的结果,从而更利于使用Hadoop MapReduce框架进行高效地计算。

  • 准备工作
jar -cvf combine-smallfiles.jar -C ./ org/shirdrn/kodz/inaction/hadoop/smallfiles
xiaoxiang@ubuntu3:~$ cd /opt/comodo/cloud/hadoop-1.0.3
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -mkdir /user/xiaoxiang/datasets/smallfiles
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/comodo/cloud/dataset/smallfiles/* /user/xiaoxiang/datasets/smallfiles
  • 运行结果
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop jar combine-smallfiles.jar org.shirdrn.kodz.inaction.hadoop.smallfiles.combine.CombineSmallfiles /user/xiaoxiang/datasets/smallfiles /user/xiaoxiang/output/smallfiles/combine
13/03/23 21:52:09 INFO input.FileInputFormat: Total input paths to process : 117
13/03/23 21:52:09 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/03/23 21:52:09 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/23 21:52:10 INFO mapred.JobClient: Running job: job_201303111631_0038
13/03/23 21:52:11 INFO mapred.JobClient:  map 0% reduce 0%
13/03/23 21:52:29 INFO mapred.JobClient:  map 33% reduce 0%
13/03/23 21:52:32 INFO mapred.JobClient:  map 55% reduce 0%
13/03/23 21:52:35 INFO mapred.JobClient:  map 76% reduce 0%
13/03/23 21:52:38 INFO mapred.JobClient:  map 99% reduce 0%
13/03/23 21:52:41 INFO mapred.JobClient:  map 100% reduce 0%
13/03/23 21:53:02 INFO mapred.JobClient:  map 100% reduce 20%
13/03/23 21:53:05 INFO mapred.JobClient:  map 100% reduce 40%
13/03/23 21:53:14 INFO mapred.JobClient:  map 100% reduce 60%
13/03/23 21:53:17 INFO mapred.JobClient:  map 100% reduce 80%
13/03/23 21:53:32 INFO mapred.JobClient:  map 100% reduce 100%
13/03/23 21:53:37 INFO mapred.JobClient: Job complete: job_201303111631_0038
13/03/23 21:53:37 INFO mapred.JobClient: Counters: 28
13/03/23 21:53:37 INFO mapred.JobClient:   Job Counters
13/03/23 21:53:37 INFO mapred.JobClient:     Launched reduce tasks=5
13/03/23 21:53:37 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=33515
13/03/23 21:53:37 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/23 21:53:37 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/03/23 21:53:37 INFO mapred.JobClient:     Launched map tasks=1
13/03/23 21:53:37 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=69085
13/03/23 21:53:37 INFO mapred.JobClient:   File Output Format Counters
13/03/23 21:53:37 INFO mapred.JobClient:     Bytes Written=237510415
13/03/23 21:53:37 INFO mapred.JobClient:   FileSystemCounters
13/03/23 21:53:37 INFO mapred.JobClient:     FILE_BYTES_READ=508266867
13/03/23 21:53:37 INFO mapred.JobClient:     HDFS_BYTES_READ=147037765
13/03/23 21:53:37 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=722417364
13/03/23 21:53:37 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=237510415
13/03/23 21:53:37 INFO mapred.JobClient:   File Input Format Counters
13/03/23 21:53:37 INFO mapred.JobClient:     Bytes Read=0
13/03/23 21:53:37 INFO mapred.JobClient:   Map-Reduce Framework
13/03/23 21:53:37 INFO mapred.JobClient:     Map output materialized bytes=214110010
13/03/23 21:53:37 INFO mapred.JobClient:     Map input records=3510000
13/03/23 21:53:37 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/03/23 21:53:37 INFO mapred.JobClient:     Spilled Records=11840717
13/03/23 21:53:37 INFO mapred.JobClient:     Map output bytes=207089980
13/03/23 21:53:37 INFO mapred.JobClient:     CPU time spent (ms)=64200
13/03/23 21:53:37 INFO mapred.JobClient:     Total committed heap usage (bytes)=722665472
13/03/23 21:53:37 INFO mapred.JobClient:     Combine input records=0
13/03/23 21:53:37 INFO mapred.JobClient:     SPLIT_RAW_BYTES=7914
13/03/23 21:53:37 INFO mapred.JobClient:     Reduce input records=3510000
13/03/23 21:53:37 INFO mapred.JobClient:     Reduce input groups=117
13/03/23 21:53:37 INFO mapred.JobClient:     Combine output records=0
13/03/23 21:53:37 INFO mapred.JobClient:     Physical memory (bytes) snapshot=820969472
13/03/23 21:53:37 INFO mapred.JobClient:     Reduce output records=3510000
13/03/23 21:53:37 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3257425920
13/03/23 21:53:37 INFO mapred.JobClient:     Map output records=3510000
  • 验证结果
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -text /user/xiaoxiang/output/smallfiles/combine/part-r-00000 | head -5
data_50000_000     44 4a 20 32 31 34 34 30 30 39 39 38 37 32 31 36 20 32 31 34 34 30 31 30 30 30 32 30 39 37 20 32 32 31 34 35 32 31 34 35
data_50000_000     44 45 20 32 31 34 34 30 30 39 39 38 37 37 33 32 20 32 31 34 34 30 31 30 30 30 31 32 34 31 20 31 38 32 34 39 37 32 37 34
data_50000_000     42 57 20 32 31 34 34 30 30 39 39 36 39 36 33 30 20 32 31 34 34 30 31 30 30 30 30 33 38 35 20 39 34 35 38 34 39 39 31 37
data_50000_000     50 59 20 32 31 34 34 30 30 39 39 37 37 34 35 34 20 32 31 34 34 30 30 39 39 39 39 35 32 39 20 34 38 37 33 32 33 34 39 37
data_50000_000     4d 4c 20 32 31 34 34 30 30 39 39 37 33 35 35 36 20 32 31 34 34 30 30 39 39 39 38 36 37 33 20 36 33 30 38 36 32 34 36 31
xiaoxiang@ubuntu3:/opt/comodo/cloud/hadoop-1.0.3$ bin/hadoop fs -text /user/xiaoxiang/output/smallfiles/combine/part-r-00000 | tail -5
data_50000_230     43 52 20 32 31 34 38 36 36 38 31 36 38 36 38 36 20 32 31 34 38 36 36 38 31 39 35 30 36 38 20 36 39 35 39 38 38 34 30 33
data_50000_230     50 52 20 32 31 34 38 36 36 38 31 36 35 36 34 36 20 32 31 34 38 36 36 38 31 39 34 36 34 30 20 38 34 30 36 35 31 39 38 38
data_50000_230     53 52 20 32 31 34 38 36 36 38 31 36 36 34 38 37 20 32 31 34 38 36 36 38 31 39 34 36 34 30 20 37 39 32 35 36 38 32 38 30
data_50000_230     4d 43 20 32 31 34 38 36 36 38 31 36 39 32 34 32 20 32 31 34 38 36 36 38 31 39 34 32 31 31 20 36 32 33 34 34 38 32 30 30
data_50000_230     4c 49 20 32 31 34 38 36 36 38 31 38 38 38 38 34 20 32 31 34 38 36 36 38 31 39 33 37 38 33 20 32 34 30 30 33 34 36 38 38

输出的文件格式,键是文件名称,值是该文件中的每一行文本数据。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(7): “Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat

  1. 大哥,org.shirdrn.kodz.inaction.hadoop.smallfiles.IdentityReducer; 你的这个类好像没有传上来哦,能发我一份么 谢谢了。

  2. 不错,现在hadoop已经有自带的实现了:org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat

  3. 我发现一个bug,是
    byte[] content = lineRecordReader.getCurrentValue().getBytes();
    currentValue.set(content, 0, content.length);

    ##############################
    Returns the raw bytes; however, only data up to getLength() is valid.

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>