HDFS读文件过程分析:读取文件的Block数据

我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示: public abstract int read() throws IOException; Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。 从HDFS读文件过程分析:获取文件对应的Block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。 Client从Datanode读取文件的一个字节 下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始: @Override public synchronized int read() throws IOException { int ret = read( oneByteBuf, 0, 1 ); return ( ret <= 0 ) ? -

HDFS 写文件过程分析

HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示: 具体过程描述如下: Client 调用 DistributedFileSystem 对象的 create 方法,创建一个文件输出流(FSDataOutputStream)对象 通过 DistributedFileSystem 对象与 Hadoop 集群的 NameNode 进行一次 RPC 远程调用,在 HDFS 的 Namespace 中创建一个文件条目(Entry),该条目没有任何的 Block 通过 FSDataOutputStream 对象,向 DataNode 写入数据,数据首先被写入 FSDataOutputStream 对象内部的 Buffer 中,然后数据被分割成一个个 Packet 数据包 以 Packet 最小单位,基于 Socket 连接发送到按特定算法选择的 HDFS 集群中一组 DataNode(正常是 3 个,可能大于等于 1)中的一个节点上,在这组 DataNode 组成的 Pipeline 上依次传输 Packet 这组 DataNode 组成的 Pipeline 反方向上,发送 ack,最终由 Pipeline 中第一个 DataNode 节点将 Pipeline a

Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理: 直接使用Storm的Topology对数据进行实时分析处理 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理 实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置K

HDFS读文件过程分析:获取文件对应的Block列表

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示: package org.shirdrn.hadoop.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HdfsFileReader { public static void main(String[] args) { String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log"; Path path = new Path(file); Configuration conf = new Configuration();

HDFS格式化过程分析

我们知道,Namenode启动时可以指定不同的选项,当指定-format选项时,就是格式化Namenode,可以在Namenode类中看到格式化的方法,方法签名如下所示: private static boolean format(Configuration conf, boolean isConfirmationNeeded, boolean isInteractive) throws IOException 在该方法中,首先调用FSNamesystem类的方法,获取到待格式化的name目录和edit目录: Collection<File> editDirsToFormat = Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf); FSNamesystem.getNamespaceEditsDirs(conf); 跟踪调用FSNamesystem类的方法,可以看到,实际上获取到的目录为: name目录:是根据配置的dfs.name.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 edit目录:是根据配置的dfs.name.edits.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 在上面format方法中,创建对应的name目录和edit目录,对应如下代码行: FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,