Shark-0.9.0安装配置运行实践

Shark(Hive on Spark)是UC Lab为Spark设计并开源的一款数据仓库系统,提供了分布式SQL查询引擎,它能够完全兼容Hive。首先,我们通过下面的图,看一下Shark与Hive的关系(http://shark.cs.berkeley.edu/img/shark-hive-integration.png): 以前我们使用Hive分析HDFS中数据时,通过将HQL翻译成MapReduce作业(Job)在Hadoop集群上运行;而使用Shark可以像使用Hive一样容易,如HQL、Metastore、序列化格式、UDF等Shark都支持,不同的是Shark运行在Spark集群上执行计算,基于Spark系统所使用的RDD模型。官方文档给出的性能方面的数据是,使用Shark查询分析HDFS数据,能比Hive快30多倍,如图所示(http://shark.cs.berkeley.edu/img/perf.png): 下面,我们通过安装配置Shark来简单地体验一下。 准备软件包 jdk-7u25-linux-x64.tar.gz scala-2.10.3.tgz apache-maven-3.2.1-bin.tar.gz hadoop-1.2.1.tar.gz spark-0.9.0-incubating-bin-hadoop1.tgz hive-0.11-shark-0.9.0.tar.gz 环境变量配置 针对上述准备软件包,我们需要安装配置好JDK、Scala环境,保证Hado

RDD:基于内存的集群计算容错抽象

该论文来自Berkeley实验室,英文标题为:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing。下面的翻译,我是基于科学网翻译基础上进行优化、修改、补充,这篇译文翻译得很不错。在此基础上,我增加了来自英文原文的图和表格数据,以及译文中缺少的未翻译的部分。如果翻译措辞或逻辑有误,欢迎批评指正。 摘要 本文提出了分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等。我们实现的RDD在迭代计

使用Java编写并运行Spark应用程序

我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.1

CentOS 6.4下安装配置Spark-0.9集群

Spark是一个快速、通用的计算集群框架,它的内核使用Scala语言编写,它提供了Scala、Java和Python编程语言high-level API,使用这些API能够非常容易地开发并行处理的应用程序。 下面,我们通过搭建Spark集群计算环境,并进行简单地验证,来体验一下使用Spark计算的特点。无论从安装运行环境还是从编写处理程序(用Scala,Spark默认提供的Shell环境可以直接输入Scala代码进行数据处理),我们都会觉得比Hadoop MapReduce计算框架要简单得多,而且,Spark可以很好地与HDFS进行交互(从HDFS读取数据,以及写数据到HDFS中)。 安装配置 下载安装配置Scala wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz tar xvzf scala-2.10.3.tgz 在~/.bashrc中增加环境变量SCALA_HOME,并使之生效: export SCALA_HOME=/usr/scala/scala-2.10.3 export PATH=$PATH:$SCALA_HOME/bin 下载安装配置Spark 我们首先在主节点m1上配置Spark程序,然后将配置好的程序文件复制分发到集群的各个从结点上。下载解压缩: wget http://d3kbcqa49mib13.cloudfront

Oozie Coordinator使用及详解

Oozie所支持工作流,工作流定义通过将多个Hadoop Job的定义按照一定的顺序组织起来,然后作为一个整体按照既定的路径运行。一个工作流已经定义了,通过启动该工作流Job,就会执行该工作流中包含的多个Hadoop Job,直到完成,这就是工作流Job的生命周期。 那么,现在我们有一个工作流Job,希望每天半夜00:00启动运行,我们能够想到的就是通过写一个定时脚本来调度程序运行。如果我们有多个工作流Job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好各个工作流Job的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,他们能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运

Oozie工作流程定义详解

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。 下面,我们详细说明工作流定义相关的内容: 工作流生命周期 在Oozie中,工作流的状态可能存在如下几种: 状态 含义说明 PREP 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。 RUNNING 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。 SUSPENDED 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。 SUCCEEDED

Oozie-3.3.2安装配置运行实践

Oozie是一个开源的工作流调度系统,它能够管理逻辑复杂的多个Hadoop作业,按照指定的顺序将其协同运行起来。例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 通过Hadoop先将原始数据同步到HDFS上; 借助MapReduce计算框架对原始数据进行转换,生成的数据以分区表的形式存储到多张Hive表中; 需要对Hive中多个表的数据进行JOIN处理,得到一个明细数据Hive大表; 将明细数据进行复杂的统计分析,得到排序后的报表信息; 需要将统计分析得到的结果数据同步到业务系统中,供业务调用使用。 上述过程可以通过工作流系统来编排任务,最终生成一个工作流实例,然后每天定时启动运行这个实例即可。在这种依赖于Hadoop存储和处理能力要求的应用场景下,Oozie可能能够简化任务调度和执行。 这里,我们在CentOS 6.2系统下安装Oozie-3.3.2,需要安装相关的依赖软件包,下面我们一步一步地进行安装,包括安装配置依赖软件包。这里,我们使用MySQL数据库存储Oozie数据,Hadoop使用的是1.2.1版本。 安装Oozie Ser

Sqoop-1.4.4工具import和export使用详解

Sqoop可以在HDFS/Hive和关系型数据库之间进行数据的导入导出,其中主要使用了import和export这两个工具。这两个工具非常强大,提供了很多选项帮助我们完成数据的迁移和同步。比如,下面两个潜在的需求: 业务数据存放在关系数据库中,如果数据量达到一定规模后需要对其进行分析或同统计,单纯使用关系数据库可能会成为瓶颈,这时可以将数据从业务数据库数据导入(import)到Hadoop平台进行离线分析。 对大规模的数据在Hadoop平台上进行分析以后,可能需要将结果同步到关系数据库中作为业务的辅助数据,这时候需要将Hadoop平台分析后的数据导出(export)到关系数据库。 这里,我们介绍Sqoop完成上述基本应用场景所使用的import和export工具,通过一些简单的例子来说明这两个工具是如何做到的。 工具通用选项 import和export工具有些通用的选项,如下表所示: 选项 含义说明 --connect <jdbc-uri> 指定JDBC连接字符串 --connection-manager <class-name> 指定要使用的连接管理器类 --driver <class-name

使用Sqoop job工具同步数据

我们使用的是Sqoop-1.4.4,在进行关系型数据库与Hadoop/Hive数据同步的时候,如果使用--incremental选项,如使用append模式,我们需要记录一个--last-value的值,如果每次执行同步脚本的时候,都需要从日志中解析出来这个--last-value的值,然后重新设置脚本参数,才能正确同步,保证从关系型数据库同步到Hadoop/Hive的数据不发生重复的问题。 而且,我们我们需要管理我们使用的这些脚本,每次执行之前可能要获取指定参数值,或者修改参数。Sqoop也提供了一种比较方面的方式,那就是直接创建一个Sqoop job,通过job来管理特定的同步任务。就像我们前面提到的增量同步问题,通过创建sqoop job可以保存上一次同步时记录的--last-value的值,也就不用再费劲去解析获取了,每次想要同步,这个job会自动从job保存的数据中获取到。 sqoop job命令使用 Sqoop job相关的命令有两个: bin/sqoop job bin/sqoop-job 使用这两个都可以。我们先看看sqoop job命令的基本用法: 创建job:--create 删除job:--delete 执行job:--exec 显示job:--show 列出job:--list 下面

Sqoop-1.4.4安装配置及基本使用

Sqoop是Apache旗下的开源项目,能够在Hadoop和结构化存储系统之间进行数据传输(导入、导出),尤其是当下应用非常广泛的关系型数据库。通常,可能很多业务数据都存储在关系型数据库中,当数据规模达到一定程度后,使用关系型数据对数据进行分析可能会存在一定的瓶颈,如上亿级别记录的复杂统计分析等。将关系型数据库中的数据同步到Hadoop平台上,借助Hadoop平台的可扩展的优势,可以进行复杂的统计分析,处理效率会有一定程度的提升。 下面,我们通过安装配置Sqoop,来体验一下Sqoop在Hadoop和MySQL之间进行数据同步的特性。 准备和配置 我们在使用的主机及其应用进程部署情况,如下所示: 节点m1(10.95.3.56):Sqoop-1.4.4,Hive-0.12.0,Namenode、JobTracker MySQL节点(10.95.3.49):MySQL数据库 我们先验证Sqoop能够成功连接MySQL数据库,然后验证将MySQL数据库表中的数据,同步到Hive中。 首先,在CentOS 6.4下安装MySQL数据库(服务器IP为:10.95.3.49): rpm -qa | grep mysql sudo rpm -e --nodeps mysql yum list | grep mysql sudo yum ins

Hive JOIN使用详解

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。 对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》): 基本执行过程,描述如下: 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。 map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。 map自带的buffer使用容

Hadoop-2.2.0集群安装配置实践

Hadoop 2.x和1.x已经大不相同了,应该说对于存储计算都更加通用了。Hadoop 2.x实现了用来管理集群资源的YARN框架,可以面向任何需要使用基于HDFS存储来计算的需要,当然MapReduce现在已经作为外围的插件式的计算框架,你可以根据需要开发或者选择合适的计算框架。目前,貌似对MapReduce支持还是比较好的,毕竟MapReduce框架已经还算成熟。其他一些基于YARN框架的标准也在开发中。 YARN框架的核心是资源的管理和分配调度,它比Hadoop 1.x中的资源分配的粒度更细了,也更加灵活了,它的前景应该不错。由于极大地灵活性,所以在使用过程中由于这些配置的灵活性,可能使用的难度也加大了一些。另外,我个人觉得,YARN毕竟还在发展之中,也有很多不成熟的地方,各种问题频频出现,资料也相对较少,官方文档有时更新也不是很及时,如果我选择做海量数据处理,可能YARN还不能满足生产环境的需要。如果完全使用MapReduce来做计算,还是选择相对更加成熟的Hadoop 1.x版本用于生产环境。 下面使用4台机器,操作系统为CentOS 6.4 64位,一台做主节点,另外三台做从节点,实践集