HDFS格式化过程分析

我们知道,Namenode启动时可以指定不同的选项,当指定-format选项时,就是格式化Namenode,可以在Namenode类中看到格式化的方法,方法签名如下所示: private static boolean format(Configuration conf, boolean isConfirmationNeeded, boolean isInteractive) throws IOException 在该方法中,首先调用FSNamesystem类的方法,获取到待格式化的name目录和edit目录: Collection<File> editDirsToFormat = Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf); FSNamesystem.getNamespaceEditsDirs(conf); 跟踪调用FSNamesystem类的方法,可以看到,实际上获取到的目录为: name目录:是根据配置的dfs.name.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 edit目录:是根据配置的dfs.name.edits.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 在上面format方法中,创建对应的name目录和edit目录,对应如下代码行: FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,

Sqoop-1.4.4安装配置及基本使用

Sqoop是Apache旗下的开源项目,能够在Hadoop和结构化存储系统之间进行数据传输(导入、导出),尤其是当下应用非常广泛的关系型数据库。通常,可能很多业务数据都存储在关系型数据库中,当数据规模达到一定程度后,使用关系型数据对数据进行分析可能会存在一定的瓶颈,如上亿级别记录的复杂统计分析等。将关系型数据库中的数据同步到Hadoop平台上,借助Hadoop平台的可扩展的优势,可以进行复杂的统计分析,处理效率会有一定程度的提升。 下面,我们通过安装配置Sqoop,来体验一下Sqoop在Hadoop和MySQL之间进行数据同步的特性。 准备和配置 我们在使用的主机及其应用进程部署情况,如下所示: 节点m1(10.95.3.56):Sqoop-1.4.4,Hive-0.12.0,Namenode、JobTracker MySQL节点(10.95.3.49):MySQL数据库 我们先验证Sqoop能够成功连接MySQL数据库,然后验证将MySQL数据库表中的数据,同步到Hive中。 首先,在CentOS 6.4下安装MySQL数据库(服务器IP为:10.95.3.49): rpm -qa | grep mysql sudo rpm -e --nodeps mysql yum list | grep mysql sudo yum ins

Hive JOIN使用详解

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。 对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》): 基本执行过程,描述如下: 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。 map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。 map自带的buffer使用容

Hadoop-2.2.0集群安装配置实践

Hadoop 2.x和1.x已经大不相同了,应该说对于存储计算都更加通用了。Hadoop 2.x实现了用来管理集群资源的YARN框架,可以面向任何需要使用基于HDFS存储来计算的需要,当然MapReduce现在已经作为外围的插件式的计算框架,你可以根据需要开发或者选择合适的计算框架。目前,貌似对MapReduce支持还是比较好的,毕竟MapReduce框架已经还算成熟。其他一些基于YARN框架的标准也在开发中。 YARN框架的核心是资源的管理和分配调度,它比Hadoop 1.x中的资源分配的粒度更细了,也更加灵活了,它的前景应该不错。由于极大地灵活性,所以在使用过程中由于这些配置的灵活性,可能使用的难度也加大了一些。另外,我个人觉得,YARN毕竟还在发展之中,也有很多不成熟的地方,各种问题频频出现,资料也相对较少,官方文档有时更新也不是很及时,如果我选择做海量数据处理,可能YARN还不能满足生产环境的需要。如果完全使用MapReduce来做计算,还是选择相对更加成熟的Hadoop 1.x版本用于生产环境。 下面使用4台机器,操作系统为CentOS 6.4 64位,一台做主节点,另外三台做从节点,实践集

Hadoop Job使用第三方依赖jar文件

当我们实现了一个Hadoop MapReduce Job以后,而这个Job可能又依赖很多外部的jar文件,在Hadoop集群上运行时,有时会出现找不到具体Class的异常。出现这种问题,基本上就是在Hadoop Job执行过程中,没有从执行的上下文中找到对应的jar文件(实际是unjar的目录,目录里面是对应的Class文件)。所以,我们自然而然想到,正确配置好对应的classpath,MapReduce Job运行时就能够找到。 有两种方式可以更好地实现,一种是设置HADOOP_CLASSPATH,将Job所依赖的jar文件加载到HADOOP_CLASSPATH,这种配置只针对该Job生效,Job结束之后HADOOP_CLASSPATH会被清理;另一种方式是,直接在构建代码的时候,将依赖jar文件与Job代码打成一个jar文件,这种方式可能会使得最终的jar文件比较大,但是结合一些代码构建工具,如Maven,可以在依赖控制方面保持一个Job一个依赖的构建配置,便于管理。下面,我们分别说明这两种方式。 设置HADOOP_CLASSPATH 比如,我们有一个使用HBase的应用,操作HBase数据库中表,肯定需要ZooKeeper,所以对应的jar文件的位置都要设置正确,让运行时Job能够

Hadoop Streaming原理及实践

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据。需要注意的是,Streaming方式是基于Unix系统的标准输入输出来进行MapReduce Job的运行,它区别与Pipes的地方主要是通信协议,Pipes使用的是Socket通信,是对使用C++语言来实现MapReduce Job并通过Socket通信来与Hadopp平台通信,完成Job的执行。任何支持标准输入输出特性的编程语言都可以使用Streaming方式来实现MapReduce Job,基本原理就是输入从Unix系统标准输入,输出使用Unix系统的标准输出。 Hadoop是使用Java语言编写的,所以最直接的方式的就是使用Java语言来实现Mapper和Reducer,然后配置MapReduce Job,提交到集群计算环境来完成计算。但是很多开发者可能对Java并不熟悉,而是对一些具有脚本特性的语言,如C++、Shell、Python、 Ruby、PHP、Perl有实际开发经验,Hadoop Streaming为这一类开发者提供了使用Hadoop集群来进行处理数据的工具,即工具包

Hadoop MapReduce处理海量小文件:压缩文件

在HDFS上存储文件,大量的小文件是非常消耗NameNode内存的,因为每个文件都会分配一个文件描述符,NameNode需要在启动的时候加载全部文件的描述信息,所以文件越多,对 NameNode来说开销越大。 我们可以考虑,将小文件压缩以后,再上传到HDFS中,这时只需要一个文件描述符信息,自然大大减轻了NameNode对内存使用的开销。MapReduce计算中,Hadoop内置提供了如下几 种压缩格式: DEFLATE gzip bzip2 LZO 使用压缩文件进行MapReduce计算,它的开销在于解压缩所消耗的时间,在特定的应用场景中这个也是应该考虑的问题。不过对于海量小文件的应用场景,我们压缩了小文件,却换 来的Locality特性。 假如成百上千的小文件压缩后只有一个Block,那么这个Block必然存在一个DataNode节点上,在计算的时候输入一个InputSplit,没有网络间传输数据的开销,而且是在本地进行 运算。倘若直接将小文件上传到HDFS上,成百上千的小Block分布在不同DataNode节点上,为了计算可能需要“移动数据”之后才能进行计算。文件很少的情况下,除了NameNode内 存使用开销以外,可能感觉不到网

Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat

在使用Hadoop处理海量小文件的应用场景中,如果你选择使用CombineFileInputFormat,而且你是第一次使用,可能你会感到有点迷惑。虽然,从这个处理方案的思想上很容易理解,但是可能会遇到这样那样的问题。 使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。 CombineFileInputFormat的大致原理是,他会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。如果想要处理一个CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一个FileInputSplit对象)。 在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。那么对于CombineFileSplit来说,你需要处理其包含的小文

Hadoop MapReduce处理海量小文件:自定义InputFormat和RecordReader

一般来说,基于Hadoop的MapReduce框架来处理数据,主要是面向海量大数据,对于这类数据,Hadoop能够使其真正发挥其能力。对于海量小文件,不是说不能使用Hadoop来处理,只不过直接进行处理效率不会高,而且海量的小文件对于HDFS的架构设计来说,会占用NameNode大量的内存来保存文件的元数据(Bookkeeping)。另外,由于文件比较小,我们是指远远小于HDFS默认Block大小(64M),比如1k~2M,都很小了,在进行运算的时候,可能无法最大限度地充分Locality特性带来的优势,导致大量的数据在集群中传输,开销很大。 但是,实际应用中,也存在类似的场景,海量的小文件的处理需求也大量存在。那么,我们在使用Hadoop进行计算的时候,需要考虑将小数据转换成大数据,比如通过合并压缩等方法,可以使其在一定程度上,能够提高使用Hadoop集群计算方式的适应性。Hadoop也内置了一些解决方法,而且提供的API,可以很方便地实现。 下面,我们通过自定义InputFormat和RecordReader来实现对海量小文件的并行处理。 基本思路描述如下: 在Mapper中将小文件合并,输出结果的文件中每

Hadoop MapReduce编程:计算极值

现在,我们不是计算一个最大值了(想了解如何计算最大值,可以参考Hadoop MapReduce编程:计算最大值),而是计算一个最大值和一个最小值。实际上,实现Mapper和Reducer也是非常简单的,但是我们要输出结果,需要能够区分出最大值和最小值,并同时输出结果,这就需要自定义自己的输出类型,以及需要定义输出格式。 测试数据 数据格式,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以

Hadoop MapReduce编程:计算最大值

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。 测试数据 我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以

HBase-0.90.4集群安装配置

HBase是Hadoop数据库,能够实现随机、实时读写你的Big Data,它是Google的Bigtable的开源实现,可以参考Bigtable的论文Bigtable: A Distributed Storage System for Structured。HBase的存储模型可以如下三个词来概括:distributed, versioned, column-oriented。HBase并非只能在HDFS文件系统上使用, 你可以应用在你的本地文件系统上部署HBase实例来存储数据。 准备工作 hbase-0.90.4.tar.gz [http://labs.renren.com/apache-mirror//hbase/stable/hbase-0.90.4.tar.gz] zookeeper-3.3.4.tar.gz 下面介绍Standalone和Distributed安装过程。 Standalone模式 这种安装模式,是在你的本地文件系统上安装配置一个HBase实例,安装配置比较简单。 首先,要保证你的本地系统能够通过ssh无密码访问,配置如下: ssh-keygen -t dsa cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys 检查一下权限:你的~/.ssh目录的权限是否为755,~/.ssh/authorized_keys的权限是否为644,如果不是,执行下面的命令行: chmod 755 ~/.ssh chmod 644 ~/.ssh/authorized_key