HDFS读文件过程分析:读取文件的Block数据

我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示: public abstract int read() throws IOException; Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。 从HDFS读文件过程分析:获取文件对应的Block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。 Client从Datanode读取文件的一个字节 下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始: @Override public synchronized int read

HDFS写文件过程分析

HDFS是一个分布式文件系统,在HDFS上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在HDFS文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示: 具体过程描述如下: Client调用DistributedFileSystem对象的create方法,创建一个文件输出流(FSDataOutputStream)对象 通过DistributedFileSystem对象与Hadoop集群的NameNode进行一次RPC远程调用,在HDFS的Namespace中创建一个文件条目(Entry),该条目没有任何的Block 通过FSDataOutputStream对象,向DataNode写入数据,数据首先被写入FSDataOutputStream对象内部的Buffer中,然后数据被分割成一个个Packet数据包 以Packet最小单位,基于Socket连接发送到按特定算法选择的HDFS集群中一组DataNode(正常是3个,可能大于等于1)中的一个节点上,在这组DataNode组成的Pipeline上依次传输Packet 这组DataNode组成的Pipeline反方向上,发送ack,最终

Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理: 直接使用Storm的Topology对数据进行实时分析处理 整合Storm+HDFS,将消息处理后写入HDFS进行

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Event从源点到达目的点的迁移的抽象 Client:操作位于源点处的Event,将其发送到Flume Agent Agent:一个独立的Flume进程,包含组件Source、Channel、Sink Source:用来消费传递到该组件的Event Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话) Flume NG架构,如图所示: 外

HDFS读文件过程分析:获取文件对应的Block列表

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示: package org.shirdrn.hadoop.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HdfsFileReader { public static void main(String[] args) { String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log";

CentOS-5.9系统下Ganglia-3.6.0监控集群安装配置实践

Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond、gmetad以及一个Web前端。Ganglia集群主要用来监控系统性能,它由RRDTool工具处理数据,并生成相应的监控相关的图形,并提供一个Web控制台来直观地提供给客户端,管理员可以通过丰富的图形来对整个被监控的物理机器集群节点进行评估。Ganglia能够监控的内容很多,如CPU利用率 、Mem利用率、硬盘利用率, I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。需要知道的是,Ganglia可以做系统监控,但是,目前它不支持服务器异常或故障报警功能。 Ganglia监控集群架构 下面,我们参考《Monitoring with Ganglia》一书,对Ganglia的架构有一个基本了解,然后在配置使用过程中,能够更好地理解监控的基本原理。下面是Ganglia的整体架构,如图所示: 通过上图我们可以看到,Gangli

Hive-0.12.0的Web接口HWI安装、配置、改造及使用

使用Hive的HWI接口,可以通过在Web页面上提交HQL查询操作,并浏览查询结果数据。默认情况下,HWI只支持浏览结果数据,不能够下载查询结果文件(当然,HWI可能也是考虑通过Web下载大量的结果数据,对服务器造成压力,或者处于安全方面的考虑)。我们对HWI进行了简单的改造,改造内容主要是增加了一个内置的文件服务器,可以通过页面进行查询,然后下载结果文件。 HWI安装配置 首先,要保证Hadoop集群正常运行,Hive能够正常使用。 先要安装Ant,如下所示: wget http://mirrors.hust.edu.cn/apache//ant/binaries/apache-ant-1.9.4-bin.tar.gz tar xvzf apache-ant-1.9.4-bin.tar.gz ln -s /usr/local/apache-ant-1.9.4-bin /usr/local/ant 修改Hive的环境配置文件hive-env.sh,增加如下配置内容: export ANT_LIB=/usr/local/ant 将如下JAR文件拷贝到${HIVE_HOME}/lib目录下面: // 用于编译JSP文件 jasper-compiler-5.5.23.jar jasper-runtim

Memcached服务器安装、配置、使用详解

我使用的是CentOS 6.4系统,安装的Memcached版本为1.4.20。这里,记录一下安装配置的过程,以及如何使用一些常用的客户端来访问Memcached存储的数据。 安装配置 首先,编译、安装、配置libevent库,执行如下命令: wget https://github.com/downloads/libevent/libevent/libevent-1.4.14b-stable.tar.gz tar xvzf libevent-1.4.14b-stable.tar.gz ln -s /usr/local/libevent-1.4.14b-stable /usr/local/libevent cd /usr/local/libevent ./configure make make install 然后,编译、安装、配置Memcached,执行如下命令行: wget http://www.memcached.org/files/memcached-1.4.20.tar.gz tar xvzf memcached-1.4.20.tar.gz ln -s /usr/local/memcached-1.4.20 /usr/local/memcached ./configure --with-libevent=/usr/local/libevent/ make make install 如果没有出错,安装成功。 管理memcached服务 启动Memcached 一般情况下,简单地

Node.js入门学习笔记

关于Node.js介绍,我们引用官网(http://nodejs.org/)的一段文字说明: Node.js is a platform built on Chrome's JavaScript runtime for easily building fast, scalable network applications. Node.js uses an event-driven, non-blocking I/O model that makes it lightweight and efficient, perfect for data-intensive real-time applications that run across distributed devices. Google Chrome浏览器基于V8的,它是一个开源的JavaScript引擎,能够编译和执行JavaScript代码,在执行速度上有很大的优势。使用Node.js能够很容易地构建快速、可扩展的网络应程序,它使用了事件驱动、非阻塞I/O模型实现,具有轻量、高效的特点,适合于构建运行在分布地设备之上的数据密集型实时应用程序。 下面通过参考各种资料,从各个方面,概括地总结一下Node.js,是我们对Node.js有一个直观的了解: 使用JavaScript运行于服务端的平台上,自然继承了JavaSc

Spring+Mybatis实现动态SQL查询

在报表类应用中,通常需要根据不同的维度去组合复杂的查询条件,然后构造SQL去执行查询。如果只是通过在程序中简单地拼接SQL语句,工作量会非常大,而且代码可能也非常难以维护。Mybatis支持动态SQL查询功能,可以通过配置动态的SQL来简化程序代码中复杂性,不过,这个颇有点XML编程的韵味,通过XML来处理复杂的数据判断、循环的功能,其实也很好理解。 准备工作 下面,我们首先创建一个MySQL示例表,如下所示: CREATE TABLE `traffic_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `domain` varchar(64) NOT NULL, `traffic_host` varchar(64) NOT NULL, `month` varchar(8) NOT NULL, `monthly_traffic` int(11) DEFAULT '0', `global_traffic_rank` int(11) DEFAULT '0', `native_traffic_rank` int(11) DEFAULT '0', `rank_in_country` varchar(64) DEFAULT NULL, `address` varchar(200) DEFAULT NULL, `email` var

HDFS格式化过程分析

我们知道,Namenode启动时可以指定不同的选项,当指定-format选项时,就是格式化Namenode,可以在Namenode类中看到格式化的方法,方法签名如下所示: private static boolean format(Configuration conf, boolean isConfirmationNeeded, boolean isInteractive) throws IOException 在该方法中,首先调用FSNamesystem类的方法,获取到待格式化的name目录和edit目录: Collection<File> editDirsToFormat = Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf); FSNamesystem.getNamespaceEditsDirs(conf); 跟踪调用FSNamesystem类的方法,可以看到,实际上获取到的目录为: name目录:是根据配置的dfs.name.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 edit目录:是根据配置的dfs.name.edits.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 在上面format方法中,创建对应的na

内部排序算法:基数排序

基本思想 基数排序是一种非比较型整数排序算法,其原理是将整数按位数切割成不同的数字,然后按每个位数分别比较。由于整数也可以表达字符串(比如名字或日期)和特定格式的浮点数,所以基数排序也不是只能使用于整数。 基数排序可以采用两种方式: LSD(Least Significant Digital):从待排序元素的最右边开始计算(如果是数字类型,即从最低位个位开始)。 MSD(Most Significant Digital):从待排序元素的最左边开始计算(如果是数字类型,即从最高位开始)。 我们以LSD方式为例,从数组R[1..n]中每个元素的最低位开始处理,假设基数为radix,如果是十进制,则radix=10。基本过程如下所示: 计算R中最大的元素,求得位数最大的元素,最大位数记为distance; 对每一位round<=distance,计算R[i] % radix即可得到; 将上面计算得到的余数作为bucket编号,每个bucket中可能存放多个数组R的元素; 按照bucket编号的顺序,收集bucket中元素,就地替换数