Hadoop MapReduce处理海量小文件:自定义InputFormat和RecordReader

一般来说,基于Hadoop的MapReduce框架来处理数据,主要是面向海量大数据,对于这类数据,Hadoop能够使其真正发挥其能力。对于海量小文件,不是说不能使用Hadoop来处理,只不过直接进行处理效率不会高,而且海量的小文件对于HDFS的架构设计来说,会占用NameNode大量的内存来保存文件的元数据(Bookkeeping)。另外,由于文件比较小,我们是指远远小于HDFS默认Block大小(64M),比如1k~2M,都很小了,在进行运算的时候,可能无法最大限度地充分Locality特性带来的优势,导致大量的数据在集群中传输,开销很大。 但是,实际应用中,也存在类似的场景,海量的小文件的处理需求也大量存在。那么,我们在使用Hadoop进行计算的时候,需要考虑将小数据转换成大数据,比如通过合并压缩等方法,可以使其在一定程度上,能够提高使用Hadoop集群计算方式的适应性。Hadoop也内置了一些解决方法,而且提供的API,可以很方便地实现。 下面,我们通过自定义InputFormat和RecordReader来实现对海量小文件的并行处理。 基本思路描述如下: 在Mapper中将小文件合并,输出结果的文件中每

Mina框架IoService通用抽象服务详解

IoService是对通信双方所进行的I/O操作的抽象,那么无论是在服务器端还是在客户端,都要进行I/O的读写操作,它们有一些共性,可以抽象出来。这里,我们主要详细说明IoAccectpr和IoConnector以及所基于的IoService抽象服务,都提供哪些操作和数据结构,都是如何构建的。首先,提供一个IoService服务接口相关的继承层次关系的类图,如图所示: 最终使用的Acceptor和Connector是上面继承层次中最下层的实现类。 IoService抽象 实际上,支持I/O操作服务的内容,集中在两个类中:IoService和AbstractIoService,看一下类图: 根据上图中IoService接口定义,我们给出接口中定义的方法,如下所示: public interface IoService { void addListener(IoServiceListener listener); void removeListener(IoServiceListener listener); boolean isDisposing(); boolean isDisposed(); void dispose(); void dispose(boolean awaitTermination); IoHandler getHandler(); void setHandler(IoHandler handler); Map<Long, IoSession> getManagedSess

CentOS 6.4单机环境下安装配置Storm

Storm是一个分布式的、高容错的实时计算系统,在实时性要求比较强的应用场景下,可以用它来处理海量数据。我们尝试着搭建Storm平台,来实现实时计算。下面,我们在CentOS 6.4上安装配置Storm系统。 安装配置 安装配置过程,按照如下步骤进行: 1、安装配置sunjdk 下载sunjdk,并安装Java运行环境: wget http://download.oracle.com/otn/java/jdk/6u45-b06/jdk-6u45-linux-x64.bin chmod +x jdk-6u45-linux-x64-rpm.bin ./jdk-6u45-linux-x64.bin 配置Java运行时环境: vi ~/.bashrc export JAVA_HOME=/usr/java/jdk1.6.0_45/ export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=$JAVA_HOME/lib/*.jar:$JAVA_HOME/jre/lib/*.jar . ~/.bashrc java -version 2、安装zeromq 执行如下命令,进行下载配置安装: wget http://download.zeromq.org/zeromq-2.2.0.tar.gz tar -zvxf zeromq-2.2.0.tar.gz cd zeromq-2.2.0/ ./configure sudo make sudo make install 3、安装jzmq 安装jzmq需要使用Git下载源码,从源代码编译安装: sudo yum insta

Hadoop MapReduce编程:计算极值

现在,我们不是计算一个最大值了(想了解如何计算最大值,可以参考Hadoop MapReduce编程:计算最大值),而是计算一个最大值和一个最小值。实际上,实现Mapper和Reducer也是非常简单的,但是我们要输出结果,需要能够区分出最大值和最小值,并同时输出结果,这就需要自定义自己的输出类型,以及需要定义输出格式。 测试数据 数据格式,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以

Hadoop MapReduce编程:计算最大值

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。 测试数据 我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以

MIna框架I/O Filter Chain层设计

I/O Filter Chain层是介于I/O Service层与I/O Handler层之间的一层,从它的命名上可以看出,这个层可以根据实际应用的需要,设置一组IoFilter来对I/O Service层与I/O Handler层之间传输数据进行过滤,任何需要在这两层之间进行处理的逻辑都可以放到IoFilter中。 我们看一下IoFilter的抽象层次设计,如图所示: 通过上述类图可见,要实现一个自定义的IoFilter,一般是直接实现IoFilterAdapter类。同时,Mina也给出了几类常用的开发IoFilter的实现类,如下所示: LoggingFilter记录所有事件和请求 ProtocolCodecFilter将到来的ByteBuffer转换成消息对象(POJO) CompressionFilter压缩数据 SSLFilter增加SSL – TLS – StartTLS支持 想要实现一个自定义的IoFilter实现类,只需要基于上述给出的几个实现类即可。 如果想要实现自己的IoFilter,可以参考如下例子: public class MyFilter extends IoFilterAdapter { @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { // Some logic here...

MIna框架I/O Service层设计

Mina从2.0版本以后,它的设计让人感觉到非常的优雅。它对网络应用通信框架的3个层进行了更好的抽象,以及在功能逻辑上的划分,同时又保证了 作为一个网络应用通信框架的统一。划分的3个层分别为: I/O Service层 I/O Filter Chain层 I/O Handler层 这里,我们重点关注I/O Service层。作为一个基于网络通信的应用,无论是服务器还是客户端角色,都要和网络I/O打交道,比如,服务器端需要创建服务器端Socket,监听指定端口并等待请求的带来,而客户端需要连接到服务器端指定的监听端口,使用网络服务。一般来说,这些I/O操作都比较复杂,而且很难在编 码中进行很好地控制,Mina的I/O Service层就是处理这些与实际的网络I/O相关的操作(事件)。 我们先看一下,对于服务器端和客户端,I/O Service层是如何设计的。类设计上的关系,作为这一层的最顶层抽象就是IoService接口类,如图所示: 通过上图,我们可以看到,IoService抽象的服务(功能)有如下几个: 管理IoSession:创建和删除IoSession,探测会话Idle状态 Filter Chain管理:处理过滤器链,允许用户修改过

Apache Mina通信框架架构与应用

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于 TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供 JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步 IO 默认使用的是 JAVA NIO 作为底层支持)操作的编程模型。 从官网文档“MINA based Application Architecture”中可以看到Mina作为一个通信层框架,在实际应用所处的位置,如图所示: Mina位于用户应用程序和底层Java网络API(和in-VM通信)之间,我们开发基于Mina的网络应用程序,就无需关心复杂的通信细节。 应用整体架构 再看一下,Mina提供的基本组件,如图所示: 也就是说,无论是客户端还是服务端,使用Mina框架实现通信的逻辑分层在概念上统一的,即包含如下三层: I/O Service – Performs actual I/O I/O Filter Chain – Filters/Transforms bytes into desired Data Structures and vice-versa I/O Handler – Here resides the actual business logic

Maven构建应用程序常用配置

使用Maven来构建应用程序,可以非常方便地管理应用相关的资源。众所周知,应用程序中涉及到的一些依赖关系,如Java应用程序依赖jar文件,如果只是手动找到相应的资源,可能需要花费一些时间。而且,即使已经积累了库文件,在未来应用程序升级以后,还要考虑到依赖库文件的升级情况,再次搜索收集。 还有一个问题,对应用程序依赖文件的管理是个非常复杂工作,占用存储空间不说,还可能因为应用之间的版本问题导致依赖冲突。使用Maven的pom模型来构建应用程序,可以更加有效地的管理,而且配置内容非常清晰(有时多了,可能pom文件显得有点臃肿)。 下面将常用的Maven配置,整理如下,以备参考。首先,整理一个简单的目录,作为快速查询之用: 设置字符集 拷贝src/main/resources/资源文件 编译代码 、编译打包成jar文件 构建测试用例配置 输出依赖jar文件到指定目录 配置指定的repository 将应用及其依赖jar文件打成一个jar文件 具体配置的详细内容,如下所示: 1、设置字符集 <properties> <project.build.sourceEncoding>UTF-8</project.b

HBase-0.90.4集群安装配置

HBase是Hadoop数据库,能够实现随机、实时读写你的Big Data,它是Google的Bigtable的开源实现,可以参考Bigtable的论文Bigtable: A Distributed Storage System for Structured。HBase的存储模型可以如下三个词来概括:distributed, versioned, column-oriented。HBase并非只能在HDFS文件系统上使用, 你可以应用在你的本地文件系统上部署HBase实例来存储数据。 准备工作 hbase-0.90.4.tar.gz [http://labs.renren.com/apache-mirror//hbase/stable/hbase-0.90.4.tar.gz] zookeeper-3.3.4.tar.gz 下面介绍Standalone和Distributed安装过程。 Standalone模式 这种安装模式,是在你的本地文件系统上安装配置一个HBase实例,安装配置比较简单。 首先,要保证你的本地系统能够通过ssh无密码访问,配置如下: ssh-keygen -t dsa cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys 检查一下权限:你的~/.ssh目录的权限是否为755,~/.ssh/authorized_keys的权限是否为644,如果不是,执行下面的命令行: chmod 755 ~/.ssh chmod 644 ~/.ssh/authorized_key

HBase Thrift客户端Java API实践

HBase的Thrift API定义,可以通过链接 http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API。 首先下载上面链接的内容,保存为Hbase.thrift。 然后,执行如下命令,生成不同编程语言的HBase API: [hadoop@master hbase]$ thrift --gen cpp Hbase.thrift [hadoop@master hbase]$ thrift --gen java Hbase.thrift [hadoop@master hbase]$ thrift --gen py Hbase.thrift [hadoop@master hbase]$ thrift --gen perl Hbase.thrift [hadoop@master hbase]$ thrift --gen csharp Hbase.thrift [hadoop@master hbase]$ thrift --gen php Hbase.thrift [hadoop@master hbase]$ thrift --gen js Hbase.thrift [hadoop@master hbase]$ thrift --gen go Hbase.thrift [hadoop@master hbase]$ thrift --gen erl Hbase.thrift [hadoop@master hbase]$ thrift --gen delphi Hbase.thrift [hadoop@master hbase]$ thrift --

CentOS安装和使用Thrift

Thrift是Apache的一个开源的跨语言服务开发框架,它提供了一个代码生成引擎来构建服务,支持C++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#,Cocoa,JavaScript,Node.js,Smalltalk,OCaml,Delphi等多种编程语言。 一般来说,使用Thrift来开发应用程序,主要建立在两种场景下: 第一,在我们开发过程中,一个比较大的项目需要多个团队进行协作,而每个团队的成员在编程技术方面的技能可能不一定相同,为了实现这种跨语言的开发氛围,使用Thrift来构建服务 第二,企业之间合作,在业务上不可避免出现跨语言的编程环境,使用Thrift可以达到类似Web Services的跨平台的特性 安装配置Thrift Thrift的编译器使用C++编写的,在安装编译器之前,首先应该保证操作系统基本环境支持C++的编译,安装相关依赖的软件包,如下所示 sudo yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel 下载Thrift的软件包,并解压缩: wget http://mirrors.cnnic.cn/apache/thri