Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Event从源点到达目的点的迁移的抽象 Client:操作位于源点处的Event,将其发送到Flume Agent Agent:一个独立的Flume进程,包含组件Source、Channel、Sink Source:用来消费传递到该组件的Event Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话) Flume NG架构,如图所示: 外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传

HDFS读文件过程分析:获取文件对应的Block列表

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示: package org.shirdrn.hadoop.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class HdfsFileReader { public static void main(String[] args) { String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log"; Path path = new Path(file); Configuration conf = new Configuration();

Hive-0.12.0的Web接口HWI安装、配置、改造及使用

使用Hive的HWI接口,可以通过在Web页面上提交HQL查询操作,并浏览查询结果数据。默认情况下,HWI只支持浏览结果数据,不能够下载查询结果文件(当然,HWI可能也是考虑通过Web下载大量的结果数据,对服务器造成压力,或者处于安全方面的考虑)。我们对HWI进行了简单的改造,改造内容主要是增加了一个内置的文件服务器,可以通过页面进行查询,然后下载结果文件。 HWI安装配置 首先,要保证Hadoop集群正常运行,Hive能够正常使用。 先要安装Ant,如下所示: wget http://mirrors.hust.edu.cn/apache//ant/binaries/apache-ant-1.9.4-bin.tar.gz tar xvzf apache-ant-1.9.4-bin.tar.gz ln -s /usr/local/apache-ant-1.9.4-bin /usr/local/ant 修改Hive的环境配置文件hive-env.sh,增加如下配置内容: export ANT_LIB=/usr/local/ant 将如下JAR文件拷贝到${HIVE_HOME}/lib目录下面: // 用于编译JSP文件 jasper-compiler-5.5.23.jar jasper-runtime-5.5.23.jar // 替换默认的servlet-api-2.5-20081211.jar,我使用的是apache-tomcat-7.0.53/lib下面的s

Spring+Mybatis实现动态SQL查询

在报表类应用中,通常需要根据不同的维度去组合复杂的查询条件,然后构造SQL去执行查询。如果只是通过在程序中简单地拼接SQL语句,工作量会非常大,而且代码可能也非常难以维护。Mybatis支持动态SQL查询功能,可以通过配置动态的SQL来简化程序代码中复杂性,不过,这个颇有点XML编程的韵味,通过XML来处理复杂的数据判断、循环的功能,其实也很好理解。 准备工作 下面,我们首先创建一个MySQL示例表,如下所示: CREATE TABLE `traffic_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `domain` varchar(64) NOT NULL, `traffic_host` varchar(64) NOT NULL, `month` varchar(8) NOT NULL, `monthly_traffic` int(11) DEFAULT '0', `global_traffic_rank` int(11) DEFAULT '0', `native_traffic_rank` int(11) DEFAULT '0', `rank_in_country` varchar(64) DEFAULT NULL, `address` varchar(200) DEFAULT NULL, `email` varchar(50) DEFAULT NULL, `traffic_type` int(2) DEFAULT '-1', `status` int(2) DEFAULT '0', `cr

HDFS格式化过程分析

我们知道,Namenode启动时可以指定不同的选项,当指定-format选项时,就是格式化Namenode,可以在Namenode类中看到格式化的方法,方法签名如下所示: private static boolean format(Configuration conf, boolean isConfirmationNeeded, boolean isInteractive) throws IOException 在该方法中,首先调用FSNamesystem类的方法,获取到待格式化的name目录和edit目录: Collection<File> editDirsToFormat = Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf); FSNamesystem.getNamespaceEditsDirs(conf); 跟踪调用FSNamesystem类的方法,可以看到,实际上获取到的目录为: name目录:是根据配置的dfs.name.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 edit目录:是根据配置的dfs.name.edits.dir属性,如果没有配置,默认使用目录/tmp/hadoop/dfs/name。 在上面format方法中,创建对应的name目录和edit目录,对应如下代码行: FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,

Shark-0.9.0安装配置运行实践

Shark(Hive on Spark)是UC Lab为Spark设计并开源的一款数据仓库系统,提供了分布式SQL查询引擎,它能够完全兼容Hive。首先,我们通过下面的图,看一下Shark与Hive的关系(http://shark.cs.berkeley.edu/img/shark-hive-integration.png): 以前我们使用Hive分析HDFS中数据时,通过将HQL翻译成MapReduce作业(Job)在Hadoop集群上运行;而使用Shark可以像使用Hive一样容易,如HQL、Metastore、序列化格式、UDF等Shark都支持,不同的是Shark运行在Spark集群上执行计算,基于Spark系统所使用的RDD模型。官方文档给出的性能方面的数据是,使用Shark查询分析HDFS数据,能比Hive快30多倍,如图所示(http://shark.cs.berkeley.edu/img/perf.png): 下面,我们通过安装配置Shark来简单地体验一下。 准备软件包 jdk-7u25-linux-x64.tar.gz scala-2.10.3.tgz apache-maven-3.2.1-bin.tar.gz hadoop-1.2.1.tar.gz spark-0.9.0-incubating-bin-hadoop1.tgz hive-0.11-shark-0.9.0.tar.gz 环境变量配置 针对上述准备软件包,我们需要安装配置好JDK、Scala环境,保证Hado

RDD:基于内存的集群计算容错抽象

该论文来自Berkeley实验室,英文标题为:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing。下面的翻译,我是基于科学网翻译基础上进行优化、修改、补充,这篇译文翻译得很不错。在此基础上,我增加了来自英文原文的图和表格数据,以及译文中缺少的未翻译的部分。如果翻译措辞或逻辑有误,欢迎批评指正。 摘要 本文提出了分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等。我们实现的RDD在迭代计

使用Java编写并运行Spark应用程序

我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.1

CentOS 6.4下安装配置Spark-0.9集群

Spark是一个快速、通用的计算集群框架,它的内核使用Scala语言编写,它提供了Scala、Java和Python编程语言high-level API,使用这些API能够非常容易地开发并行处理的应用程序。 下面,我们通过搭建Spark集群计算环境,并进行简单地验证,来体验一下使用Spark计算的特点。无论从安装运行环境还是从编写处理程序(用Scala,Spark默认提供的Shell环境可以直接输入Scala代码进行数据处理),我们都会觉得比Hadoop MapReduce计算框架要简单得多,而且,Spark可以很好地与HDFS进行交互(从HDFS读取数据,以及写数据到HDFS中)。 安装配置 下载安装配置Scala wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz tar xvzf scala-2.10.3.tgz 在~/.bashrc中增加环境变量SCALA_HOME,并使之生效: export SCALA_HOME=/usr/scala/scala-2.10.3 export PATH=$PATH:$SCALA_HOME/bin 下载安装配置Spark 我们首先在主节点m1上配置Spark程序,然后将配置好的程序文件复制分发到集群的各个从结点上。下载解压缩: wget http://d3kbcqa49mib13.cloudfront

Oozie Coordinator使用及详解

Oozie所支持工作流,工作流定义通过将多个Hadoop Job的定义按照一定的顺序组织起来,然后作为一个整体按照既定的路径运行。一个工作流已经定义了,通过启动该工作流Job,就会执行该工作流中包含的多个Hadoop Job,直到完成,这就是工作流Job的生命周期。 那么,现在我们有一个工作流Job,希望每天半夜00:00启动运行,我们能够想到的就是通过写一个定时脚本来调度程序运行。如果我们有多个工作流Job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好各个工作流Job的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,他们能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运

Oozie工作流程定义详解

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。 下面,我们详细说明工作流定义相关的内容: 工作流生命周期 在Oozie中,工作流的状态可能存在如下几种: 状态 含义说明 PREP 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。 RUNNING 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。 SUSPENDED 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。 SUCCEEDED

Oozie-3.3.2安装配置运行实践

Oozie是一个开源的工作流调度系统,它能够管理逻辑复杂的多个Hadoop作业,按照指定的顺序将其协同运行起来。例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 通过Hadoop先将原始数据同步到HDFS上; 借助MapReduce计算框架对原始数据进行转换,生成的数据以分区表的形式存储到多张Hive表中; 需要对Hive中多个表的数据进行JOIN处理,得到一个明细数据Hive大表; 将明细数据进行复杂的统计分析,得到排序后的报表信息; 需要将统计分析得到的结果数据同步到业务系统中,供业务调用使用。 上述过程可以通过工作流系统来编排任务,最终生成一个工作流实例,然后每天定时启动运行这个实例即可。在这种依赖于Hadoop存储和处理能力要求的应用场景下,Oozie可能能够简化任务调度和执行。 这里,我们在CentOS 6.2系统下安装Oozie-3.3.2,需要安装相关的依赖软件包,下面我们一步一步地进行安装,包括安装配置依赖软件包。这里,我们使用MySQL数据库存储Oozie数据,Hadoop使用的是1.2.1版本。 安装Oozie Ser