HDFS读文件过程分析:读取文件的Block数据

我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示: public abstract int read() throws IOException; Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。 从HDFS读文件过程分析:获取文件对应的Block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。 Client从Datanode读取文件的一个字节 下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始: @Override public synchronized int read() throws IOException { int ret = read( oneByteBuf, 0, 1 ); return ( ret <= 0 ) ? -

HDFS 写文件过程分析

HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示: 具体过程描述如下: Client 调用 DistributedFileSystem 对象的 create 方法,创建一个文件输出流(FSDataOutputStream)对象 通过 DistributedFileSystem 对象与 Hadoop 集群的 NameNode 进行一次 RPC 远程调用,在 HDFS 的 Namespace 中创建一个文件条目(Entry),该条目没有任何的 Block 通过 FSDataOutputStream 对象,向 DataNode 写入数据,数据首先被写入 FSDataOutputStream 对象内部的 Buffer 中,然后数据被分割成一个个 Packet 数据包 以 Packet 最小单位,基于 Socket 连接发送到按特定算法选择的 HDFS 集群中一组 DataNode(正常是 3 个,可能大于等于 1)中的一个节点上,在这组 DataNode 组成的 Pipeline 上依次传输 Packet 这组 DataNode 组成的 Pipeline 反方向上,发送 ack,最终由 Pipeline 中第一个 DataNode 节点将 Pipeline a

Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理: 直接使用Storm的Topology对数据进行实时分析处理 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理 实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置K

HDFS读文件过程分析:获取文件对应的Block列表

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示: package org.shirdrn.hadoop.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HdfsFileReader { public static void main(String[] args) { String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log"; Path path = new Path(file); Configuration conf = new Configuration();

Hive-0.12.0的Web接口HWI安装、配置、改造及使用

使用Hive的HWI接口,可以通过在Web页面上提交HQL查询操作,并浏览查询结果数据。默认情况下,HWI只支持浏览结果数据,不能够下载查询结果文件(当然,HWI可能也是考虑通过Web下载大量的结果数据,对服务器造成压力,或者处于安全方面的考虑)。我们对HWI进行了简单的改造,改造内容主要是增加了一个内置的文件服务器,可以通过页面进行查询,然后下载结果文件。 HWI安装配置 首先,要保证Hadoop集群正常运行,Hive能够正常使用。 先要安装Ant,如下所示: wget http://mirrors.hust.edu.cn/apache//ant/binaries/apache-ant-1.9.4-bin.tar.gz tar xvzf apache-ant-1.9.4-bin.tar.gz ln -s /usr/local/apache-ant-1.9.4-bin /usr/local/ant 修改Hive的环境配置文件hive-env.sh,增加如下配置内容: export ANT_LIB=/usr/local/ant 将如下JAR文件拷贝到${HIVE_HOME}/lib目录下面: // 用于编译JSP文件 jasper-compiler-5.5.23.jar jasper-runtime-5.5.23.jar // 替换默认的servlet-api-2.5-20081211.jar,我使用的是apache-tomcat-7.0.53/lib下面的s

HDFS格式化过程分析

我们知道,Namenode启动时可以指定不同的选项,当指定-format选项时,就是格式化Namenode,可以在Namenode类中看到格式化的方法,方法签名如下所示: private static boolean format(Configuration conf, boolean isConfirmationNeeded, boolean isInteractive) throws IOException 在该方法中,首先调用FSNamesystem类的方法,获取到待格式化的name目录和edit目录: Collection<File> editDirsToFormat = Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf); FSNamesystem.getNamespaceEditsDirs(conf); 跟踪调用FSNamesystem类的方法,可以看到,实际上获取到的目录为: name目录:是根据配置的dfs.name.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 edit目录:是根据配置的dfs.name.edits.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 在上面format方法中,创建对应的name目录和edit目录,对应如下代码行: FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,

Hive JOIN使用详解

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。 对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》): 基本执行过程,描述如下: 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。 map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。 map自带的buffer使用容

Hadoop-2.2.0集群安装配置实践

Hadoop 2.x和1.x已经大不相同了,应该说对于存储计算都更加通用了。Hadoop 2.x实现了用来管理集群资源的YARN框架,可以面向任何需要使用基于HDFS存储来计算的需要,当然MapReduce现在已经作为外围的插件式的计算框架,你可以根据需要开发或者选择合适的计算框架。目前,貌似对MapReduce支持还是比较好的,毕竟MapReduce框架已经还算成熟。其他一些基于YARN框架的标准也在开发中。 YARN框架的核心是资源的管理和分配调度,它比Hadoop 1.x中的资源分配的粒度更细了,也更加灵活了,它的前景应该不错。由于极大地灵活性,所以在使用过程中由于这些配置的灵活性,可能使用的难度也加大了一些。另外,我个人觉得,YARN毕竟还在发展之中,也有很多不成熟的地方,各种问题频频出现,资料也相对较少,官方文档有时更新也不是很及时,如果我选择做海量数据处理,可能YARN还不能满足生产环境的需要。如果完全使用MapReduce来做计算,还是选择相对更加成熟的Hadoop 1.x版本用于生产环境。 下面使用4台机器,操作系统为CentOS 6.4 64位,一台做主节点,另外三台做从节点,实践集

ZooKeeper应用案例

我们通过学习借鉴,哪些项目或应用都使用了ZooKeeper,可以了解我们的应用使用ZooKeeper是否能真正地带来价值,当然,有些项目可能也未必非常适合使用ZooKeeper,我们要批判地学习、借鉴和吸收。 下面是一些使用了ZooKeeper实现的案例: HDFS HA(QJM) Hadoop 2.x之前的版本,HDFS集群中Namenode是整个集群的中央元数据存储和服务节点,它存在SPOF的问题。在2.x版本中,提出了各种HA方案,避免Namenode的SPOF问题,其中基于QJM(Quorum Journal Manager)的方案可以解决这个问题:使用QJM的方案中,HDFS集群中存在两类节点,一类是Namenode节点(包括Active状态的Namenode,和Standby状态的Namenode),另一类是JournalNode,进行容错。当Active状态的Namenode元数据发生改变时,通过JournalNode进程(ZooKeeper集群中)来监视这种变化,然后同步到Standby状态的Namenode节点(实际上同步的是EditLog镜像文件内容的变更)。 当Active状态的节点发生故障后,Standby节点的Namenode自动切换,并接管HDFS集群中Active状态Namenode的服务,用来向客户端提供元数据服务。

ZooKeeper架构设计及其应用要点

ZooKeeper是一个开源的分布式服务框架,它是Apache Hadoop项目的一个子项目,主要用来解决分布式应用场景中存在的一些问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置管理等,它支持Standalone模式和分布式模式,在分布式模式下,能够为分布式应用提供高性能和可靠地协调服务,而且使用ZooKeeper可以大大简化分布式协调服务的实现,为开发分布式应用极大地降低了成本。 总体架构 ZooKeeper分布式协调服务框架的总体架构,如图所示: ZooKeeper集群由一组Server节点组成,这一组Server节点中存在一个角色为Leader的节点,其他节点都为Follower。当客户端Client连接到ZooKeeper集群,并且执行写请求时,这些请求会被发送到Leader节点上,然后Leader节点上数据变更会同步到集群中其他的Follower节点。 Leader节点在接收到数据变更请求后,首先将变更写入本地磁盘,以作恢复之用。当所有的写请求持久化到磁盘以后,才会将变更应用到内存中。 ZooKeeper使用了一种自定义的原子消息协议,在消息层的这种原子特性,保证了整个协调系统中的节点数据或状态

ZooKeeper-3.3.4集群安装配置

ZooKeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以Standalone模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。 有关ZooKeeper的介绍,网上很多,也可以参考文章后面,我整理的一些相关链接。 ZooKeeper的安装配置还算比较容易的,下面,我们简单说明一下ZooKeeper的配置。 ZooKeeper Standalone模式 从Apache网站上(zookeeper.apache.org)下载ZooKeeper软件包,我选择了3.3.4版本的(zookeeper-3.3.4.tar.gz),在一台Linux机器上安装非常容易,只需要解压缩后,简单配置一下即可以启动ZooKeeper服务器进程。 将zookeeper-3.3.4/conf目录下面的 zoo_sample.cfg修改为zoo.cfg,配置文件内容如下

Hadoop Job使用第三方依赖jar文件

当我们实现了一个Hadoop MapReduce Job以后,而这个Job可能又依赖很多外部的jar文件,在Hadoop集群上运行时,有时会出现找不到具体Class的异常。出现这种问题,基本上就是在Hadoop Job执行过程中,没有从执行的上下文中找到对应的jar文件(实际是unjar的目录,目录里面是对应的Class文件)。所以,我们自然而然想到,正确配置好对应的classpath,MapReduce Job运行时就能够找到。 有两种方式可以更好地实现,一种是设置HADOOP_CLASSPATH,将Job所依赖的jar文件加载到HADOOP_CLASSPATH,这种配置只针对该Job生效,Job结束之后HADOOP_CLASSPATH会被清理;另一种方式是,直接在构建代码的时候,将依赖jar文件与Job代码打成一个jar文件,这种方式可能会使得最终的jar文件比较大,但是结合一些代码构建工具,如Maven,可以在依赖控制方面保持一个Job一个依赖的构建配置,便于管理。下面,我们分别说明这两种方式。 设置HADOOP_CLASSPATH 比如,我们有一个使用HBase的应用,操作HBase数据库中表,肯定需要ZooKeeper,所以对应的jar文件的位置都要设置正确,让运行时Job能够