Mina框架IoSession详解

通过Mina官 网文档,我们可以看到,有如下几个状态: Connected : the session has been created and is available Idle : the session hasn’t processed any request for at least a period of time (this period is configurable)Closing : the session is being closed (the remaining messages are being flushed, cleaning up is not terminated) Idle for read : no read has actually been made for a period of time Idle for write : no write has actually been made for a period of time Idle for both : no read nor write for a period of time Closed : The session is now closed, nothing else can be done to revive it. 对应的状态迁移图,如图所示: 通过上面的状态图,我们可以看出,是哪个事件的发生使得IoSession进入哪个状态,比较直观明了。下面,我们看一下IoSession对应的设计,类继承关系如下所示: 对于IoSession接口类,我在上图把具有不同类型功能的操作进行了分类,说明如下: 一个IoSession实例可以访问/持有哪些数据

Mina框架IoHandler与IoProcessor详解

我们已经知道,IoHandler是开发网络应用程序的时候,与实际业务逻辑相关的组件,即属于Mina核心框架之外的应用层组件。从Mina 官方文档上,我们几乎没有看到对IoProcessor的说明,实际上IoProcessor对实际使用Mina框架的开发人员透明,无需你去了解它的实现逻辑,它在Mina中用来处理实际的I/O操作。 我们分析的思路是,先分别对IoHandler与IoProcessor进行单独分析,然后再阐述它们之间的不同以及联系。 IoHandler 当我们通过IoSession执行相关操作的时候,如写数据,这些事件会触发Mina框架抽象的IoService实例,从而调用Mina框架底层的相关组件进行处理。这时,配置的IoHandler就被用来处理Mina所触发的相关事件,处理这些事件的操作被抽象出来。 实际上,IoHandler的继承层次非常简单,也说明了基于Mina框架开发实际网络应用程序,对业务逻辑的处理也还是相对比较容易的。看一下 IoHandler的继承层次,如图所示: IoHandler接口所定义的操作,一共定义了7个处理事件的操作,如下所示: public interface IoHandler { void sessionCreated(IoSession sessi

Hadoop MapReduce处理海量小文件:压缩文件

在HDFS上存储文件,大量的小文件是非常消耗NameNode内存的,因为每个文件都会分配一个文件描述符,NameNode需要在启动的时候加载全部文件的描述信息,所以文件越多,对 NameNode来说开销越大。 我们可以考虑,将小文件压缩以后,再上传到HDFS中,这时只需要一个文件描述符信息,自然大大减轻了NameNode对内存使用的开销。MapReduce计算中,Hadoop内置提供了如下几 种压缩格式: DEFLATE gzip bzip2 LZO 使用压缩文件进行MapReduce计算,它的开销在于解压缩所消耗的时间,在特定的应用场景中这个也是应该考虑的问题。不过对于海量小文件的应用场景,我们压缩了小文件,却换 来的Locality特性。 假如成百上千的小文件压缩后只有一个Block,那么这个Block必然存在一个DataNode节点上,在计算的时候输入一个InputSplit,没有网络间传输数据的开销,而且是在本地进行 运算。倘若直接将小文件上传到HDFS上,成百上千的小Block分布在不同DataNode节点上,为了计算可能需要“移动数据”之后才能进行计算。文件很少的情况下,除了NameNode内 存使用开销以外,可能感觉不到网

Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat

在使用Hadoop处理海量小文件的应用场景中,如果你选择使用CombineFileInputFormat,而且你是第一次使用,可能你会感到有点迷惑。虽然,从这个处理方案的思想上很容易理解,但是可能会遇到这样那样的问题。 使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。 CombineFileInputFormat的大致原理是,他会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。如果想要处理一个CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一个FileInputSplit对象)。 在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。那么对于CombineFileSplit来说,你需要处理其包含的小文

Hadoop MapReduce处理海量小文件:自定义InputFormat和RecordReader

一般来说,基于Hadoop的MapReduce框架来处理数据,主要是面向海量大数据,对于这类数据,Hadoop能够使其真正发挥其能力。对于海量小文件,不是说不能使用Hadoop来处理,只不过直接进行处理效率不会高,而且海量的小文件对于HDFS的架构设计来说,会占用NameNode大量的内存来保存文件的元数据(Bookkeeping)。另外,由于文件比较小,我们是指远远小于HDFS默认Block大小(64M),比如1k~2M,都很小了,在进行运算的时候,可能无法最大限度地充分Locality特性带来的优势,导致大量的数据在集群中传输,开销很大。 但是,实际应用中,也存在类似的场景,海量的小文件的处理需求也大量存在。那么,我们在使用Hadoop进行计算的时候,需要考虑将小数据转换成大数据,比如通过合并压缩等方法,可以使其在一定程度上,能够提高使用Hadoop集群计算方式的适应性。Hadoop也内置了一些解决方法,而且提供的API,可以很方便地实现。 下面,我们通过自定义InputFormat和RecordReader来实现对海量小文件的并行处理。 基本思路描述如下: 在Mapper中将小文件合并,输出结果的文件中每

Mina框架IoService通用抽象服务详解

IoService是对通信双方所进行的I/O操作的抽象,那么无论是在服务器端还是在客户端,都要进行I/O的读写操作,它们有一些共性,可以抽象出来。这里,我们主要详细说明IoAccectpr和IoConnector以及所基于的IoService抽象服务,都提供哪些操作和数据结构,都是如何构建的。首先,提供一个IoService服务接口相关的继承层次关系的类图,如图所示: 最终使用的Acceptor和Connector是上面继承层次中最下层的实现类。 IoService抽象 实际上,支持I/O操作服务的内容,集中在两个类中:IoService和AbstractIoService,看一下类图: 根据上图中IoService接口定义,我们给出接口中定义的方法,如下所示: public interface IoService { void addListener(IoServiceListener listener); void removeListener(IoServiceListener listener); boolean isDisposing(); boolean isDisposed(); void dispose(); void dispose(boolean awaitTermination); IoHandler getHandler(); void setHandler(IoHandler handler); Map<Long, IoSession> getManagedSess

CentOS 6.4单机环境下安装配置Storm

Storm是一个分布式的、高容错的实时计算系统,在实时性要求比较强的应用场景下,可以用它来处理海量数据。我们尝试着搭建Storm平台,来实现实时计算。下面,我们在CentOS 6.4上安装配置Storm系统。 安装配置 安装配置过程,按照如下步骤进行: 1、安装配置sunjdk 下载sunjdk,并安装Java运行环境: wget http://download.oracle.com/otn/java/jdk/6u45-b06/jdk-6u45-linux-x64.bin chmod +x jdk-6u45-linux-x64-rpm.bin ./jdk-6u45-linux-x64.bin 配置Java运行时环境: vi ~/.bashrc export JAVA_HOME=/usr/java/jdk1.6.0_45/ export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=$JAVA_HOME/lib/*.jar:$JAVA_HOME/jre/lib/*.jar . ~/.bashrc java -version 2、安装zeromq 执行如下命令,进行下载配置安装: wget http://download.zeromq.org/zeromq-2.2.0.tar.gz tar -zvxf zeromq-2.2.0.tar.gz cd zeromq-2.2.0/ ./configure sudo make sudo make install 3、安装jzmq 安装jzmq需要使用Git下载源码,从源代码编译安装: sudo yum insta

Hadoop MapReduce编程:计算极值

现在,我们不是计算一个最大值了(想了解如何计算最大值,可以参考Hadoop MapReduce编程:计算最大值),而是计算一个最大值和一个最小值。实际上,实现Mapper和Reducer也是非常简单的,但是我们要输出结果,需要能够区分出最大值和最小值,并同时输出结果,这就需要自定义自己的输出类型,以及需要定义输出格式。 测试数据 数据格式,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以

Hadoop MapReduce编程:计算最大值

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。 测试数据 我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示: SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666 上面文本数据一行一行存储,一行包含4部分,分别表示: 国家代码 起始时间 截止时间 随机成本/权重估值 各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以

使用libsvm进行分类预测

使用libsvm,首先需要将实际待分类的内容或数据(训练数据,或预测数据)进行量化,然后通过libsvm提供的功能实现分类和预测。下面介绍使用libsvm的基本步骤。 准备训练数据 数据格式: <label1> <index1>:<value11> <index2>:<value12>... <label2> <index1>:<value21> <index2>:<value22>... <label3> <index1>:<value31> <index2>:<value32>... ... 每一行,表示以已定义的类别标签,以及属于该标签的各个属性值,每个属性值以“属性索引编号:属性值”的格式。一行内容表示一个类别属性以及与该类别相关的各个属性的值。属性的值,一般可以表示为“该属性隶属于该类别的程度”,越大,表示该属性更能决定属性该类别。 上面的数据必须使用数字类型,例如类别,可以通过不同的整数来表示不同的类别。 准备的原始训练样本数据存放在文件raw_data.txt中,内容如下所示: 1 1:0.4599 2:0.8718 3:0.1987 2 1:0.9765 2:0.2398 3:0.3999 3 1:0.0988 2:0.2432 3:0

MIna框架I/O Filter Chain层设计

I/O Filter Chain层是介于I/O Service层与I/O Handler层之间的一层,从它的命名上可以看出,这个层可以根据实际应用的需要,设置一组IoFilter来对I/O Service层与I/O Handler层之间传输数据进行过滤,任何需要在这两层之间进行处理的逻辑都可以放到IoFilter中。 我们看一下IoFilter的抽象层次设计,如图所示: 通过上述类图可见,要实现一个自定义的IoFilter,一般是直接实现IoFilterAdapter类。同时,Mina也给出了几类常用的开发IoFilter的实现类,如下所示: LoggingFilter记录所有事件和请求 ProtocolCodecFilter将到来的ByteBuffer转换成消息对象(POJO) CompressionFilter压缩数据 SSLFilter增加SSL – TLS – StartTLS支持 想要实现一个自定义的IoFilter实现类,只需要基于上述给出的几个实现类即可。 如果想要实现自己的IoFilter,可以参考如下例子: public class MyFilter extends IoFilterAdapter { @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { // Some logic here...

MIna框架I/O Service层设计

Mina从2.0版本以后,它的设计让人感觉到非常的优雅。它对网络应用通信框架的3个层进行了更好的抽象,以及在功能逻辑上的划分,同时又保证了 作为一个网络应用通信框架的统一。划分的3个层分别为: I/O Service层 I/O Filter Chain层 I/O Handler层 这里,我们重点关注I/O Service层。作为一个基于网络通信的应用,无论是服务器还是客户端角色,都要和网络I/O打交道,比如,服务器端需要创建服务器端Socket,监听指定端口并等待请求的带来,而客户端需要连接到服务器端指定的监听端口,使用网络服务。一般来说,这些I/O操作都比较复杂,而且很难在编 码中进行很好地控制,Mina的I/O Service层就是处理这些与实际的网络I/O相关的操作(事件)。 我们先看一下,对于服务器端和客户端,I/O Service层是如何设计的。类设计上的关系,作为这一层的最顶层抽象就是IoService接口类,如图所示: 通过上图,我们可以看到,IoService抽象的服务(功能)有如下几个: 管理IoSession:创建和删除IoSession,探测会话Idle状态 Filter Chain管理:处理过滤器链,允许用户修改过