使用Java编写并运行Spark应用程序

我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.1

CentOS 6.4下安装配置Spark-0.9集群

Spark是一个快速、通用的计算集群框架,它的内核使用Scala语言编写,它提供了Scala、Java和Python编程语言high-level API,使用这些API能够非常容易地开发并行处理的应用程序。 下面,我们通过搭建Spark集群计算环境,并进行简单地验证,来体验一下使用Spark计算的特点。无论从安装运行环境还是从编写处理程序(用Scala,Spark默认提供的Shell环境可以直接输入Scala代码进行数据处理),我们都会觉得比Hadoop MapReduce计算框架要简单得多,而且,Spark可以很好地与HDFS进行交互(从HDFS读取数据,以及写数据到HDFS中)。 安装配置 下载安装配置Scala wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz tar xvzf scala-2.10.3.tgz 在~/.bashrc中增加环境变量SCALA_HOME,并使之生效: export SCALA_HOME=/usr/scala/scala-2.10.3 export PATH=$PATH:$SCALA_HOME/bin 下载安装配置Spark 我们首先在主节点m1上配置Spark程序,然后将配置好的程序文件复制分发到集群的各个从结点上。下载解压缩: wget http://d3kbcqa49mib13.cloudfront

Oozie Coordinator使用及详解

Oozie所支持工作流,工作流定义通过将多个Hadoop Job的定义按照一定的顺序组织起来,然后作为一个整体按照既定的路径运行。一个工作流已经定义了,通过启动该工作流Job,就会执行该工作流中包含的多个Hadoop Job,直到完成,这就是工作流Job的生命周期。 那么,现在我们有一个工作流Job,希望每天半夜00:00启动运行,我们能够想到的就是通过写一个定时脚本来调度程序运行。如果我们有多个工作流Job,使用crontab的方式调用可能需要编写大量的脚本,还要通过脚本来控制好各个工作流Job的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,他们能够将每个工作流Job作为一个动作(Action)来运行,相当于工作流定义中的一个执行节点(我们可以理解为工作流的工作流),这样就能够将多个工作流Job组织起来,称为Coordinator Job,并指定触发时间和频率,还可以配置数据集、并发数等。一个Coordinator Job包含了在Job外部设置执行周期和频率的语义,类似于在工作流外部增加了一个协调器来管理这些工作流的工作流Job的运

Oozie工作流程定义详解

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。 下面,我们详细说明工作流定义相关的内容: 工作流生命周期 在Oozie中,工作流的状态可能存在如下几种: 状态 含义说明 PREP 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。 RUNNING 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。 SUSPENDED 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。 SUCCEEDED

Oozie-3.3.2安装配置运行实践

Oozie是一个开源的工作流调度系统,它能够管理逻辑复杂的多个Hadoop作业,按照指定的顺序将其协同运行起来。例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 通过Hadoop先将原始数据同步到HDFS上; 借助MapReduce计算框架对原始数据进行转换,生成的数据以分区表的形式存储到多张Hive表中; 需要对Hive中多个表的数据进行JOIN处理,得到一个明细数据Hive大表; 将明细数据进行复杂的统计分析,得到排序后的报表信息; 需要将统计分析得到的结果数据同步到业务系统中,供业务调用使用。 上述过程可以通过工作流系统来编排任务,最终生成一个工作流实例,然后每天定时启动运行这个实例即可。在这种依赖于Hadoop存储和处理能力要求的应用场景下,Oozie可能能够简化任务调度和执行。 这里,我们在CentOS 6.2系统下安装Oozie-3.3.2,需要安装相关的依赖软件包,下面我们一步一步地进行安装,包括安装配置依赖软件包。这里,我们使用MySQL数据库存储Oozie数据,Hadoop使用的是1.2.1版本。 安装Oozie Ser

Sqoop-1.4.4工具import和export使用详解

Sqoop可以在HDFS/Hive和关系型数据库之间进行数据的导入导出,其中主要使用了import和export这两个工具。这两个工具非常强大,提供了很多选项帮助我们完成数据的迁移和同步。比如,下面两个潜在的需求: 业务数据存放在关系数据库中,如果数据量达到一定规模后需要对其进行分析或同统计,单纯使用关系数据库可能会成为瓶颈,这时可以将数据从业务数据库数据导入(import)到Hadoop平台进行离线分析。 对大规模的数据在Hadoop平台上进行分析以后,可能需要将结果同步到关系数据库中作为业务的辅助数据,这时候需要将Hadoop平台分析后的数据导出(export)到关系数据库。 这里,我们介绍Sqoop完成上述基本应用场景所使用的import和export工具,通过一些简单的例子来说明这两个工具是如何做到的。 工具通用选项 import和export工具有些通用的选项,如下表所示: 选项 含义说明 --connect <jdbc-uri> 指定JDBC连接字符串 --connection-manager <class-name> 指定要使用的连接管理器类 --driver <class-name

使用Sqoop job工具同步数据

我们使用的是Sqoop-1.4.4,在进行关系型数据库与Hadoop/Hive数据同步的时候,如果使用--incremental选项,如使用append模式,我们需要记录一个--last-value的值,如果每次执行同步脚本的时候,都需要从日志中解析出来这个--last-value的值,然后重新设置脚本参数,才能正确同步,保证从关系型数据库同步到Hadoop/Hive的数据不发生重复的问题。 而且,我们我们需要管理我们使用的这些脚本,每次执行之前可能要获取指定参数值,或者修改参数。Sqoop也提供了一种比较方面的方式,那就是直接创建一个Sqoop job,通过job来管理特定的同步任务。就像我们前面提到的增量同步问题,通过创建sqoop job可以保存上一次同步时记录的--last-value的值,也就不用再费劲去解析获取了,每次想要同步,这个job会自动从job保存的数据中获取到。 sqoop job命令使用 Sqoop job相关的命令有两个: bin/sqoop job bin/sqoop-job 使用这两个都可以。我们先看看sqoop job命令的基本用法: 创建job:--create 删除job:--delete 执行job:--exec 显示job:--show 列出job:--list 下面

Sqoop-1.4.4安装配置及基本使用

Sqoop是Apache旗下的开源项目,能够在Hadoop和结构化存储系统之间进行数据传输(导入、导出),尤其是当下应用非常广泛的关系型数据库。通常,可能很多业务数据都存储在关系型数据库中,当数据规模达到一定程度后,使用关系型数据对数据进行分析可能会存在一定的瓶颈,如上亿级别记录的复杂统计分析等。将关系型数据库中的数据同步到Hadoop平台上,借助Hadoop平台的可扩展的优势,可以进行复杂的统计分析,处理效率会有一定程度的提升。 下面,我们通过安装配置Sqoop,来体验一下Sqoop在Hadoop和MySQL之间进行数据同步的特性。 准备和配置 我们在使用的主机及其应用进程部署情况,如下所示: 节点m1(10.95.3.56):Sqoop-1.4.4,Hive-0.12.0,Namenode、JobTracker MySQL节点(10.95.3.49):MySQL数据库 我们先验证Sqoop能够成功连接MySQL数据库,然后验证将MySQL数据库表中的数据,同步到Hive中。 首先,在CentOS 6.4下安装MySQL数据库(服务器IP为:10.95.3.49): rpm -qa | grep mysql sudo rpm -e --nodeps mysql yum list | grep mysql sudo yum ins

Hive JOIN使用详解

Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL。有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job。 对于最基本的HQL查询我们不再累述,这里主要说明Hive中进行统计分析时使用到的JOIN操作。在说明Hive JOIN之前,我们先简单说明一下,Hadoop执行MR Job的基本过程(运行机制),能更好的帮助我们理解HQL转换到底层的MR Job后是如何执行的。我们重点说明MapReduce执行过程中,从Map端到Reduce端这个过程(Shuffle)的执行情况,如图所示(来自《Hadoop: The Definitive Guide》): 基本执行过程,描述如下: 一个InputSplit输入到map,会运行我们实现的Mapper的处理逻辑,对数据进行映射操作。 map输出时,会首先将输出中间结果写入到map自带的buffer中(buffer默认大小为100M,可以通过io.sort.mb配置)。 map自带的buffer使用容

CentOS 6.4系统MySQL主从复制基本配置实践

对于MySQL数据库一般用途的主从复制,可以实现数据的备份(如果希望在主节点失效后,能够使从节点自动接管,就需要更加复杂的配置,这里暂时先不考虑),如果主节点出现硬件故障,数据库服务器可以直接手动切换成备份节点(从节点),继续提供服务。基本的主从复制配置起来非常容易,这里我们做个简单的记录总结。 我们选择两台服务器来进行MySQL的主从复制实践,一台m1作为主节点,另一台nn作为从节点。 两台机器上都需要安装MySQL数据库,如果想要卸掉默认安装的,可以执行如下命令: sudo rpm -e --nodeps mysql yum list | grep mysql 现在可以在CentOS 6.4上直接执行如下命令进行安装: sudo yum install -y mysql-server mysql mysql-deve 为root用户设置密码: mysqladmin -u root password 'shiyanjun' 然后可以直接通过MySQL客户端登录: mysql -u root -p 主节点配置 首先,考虑到数据库的安全,以及便于管理,我们需要在主节点m1上增加一个专用的复制用户,使得任意想要从主节点进行复制从节点都必须使用这个账号: CREATE USER repl

Hadoop-2.2.0集群安装配置实践

Hadoop 2.x和1.x已经大不相同了,应该说对于存储计算都更加通用了。Hadoop 2.x实现了用来管理集群资源的YARN框架,可以面向任何需要使用基于HDFS存储来计算的需要,当然MapReduce现在已经作为外围的插件式的计算框架,你可以根据需要开发或者选择合适的计算框架。目前,貌似对MapReduce支持还是比较好的,毕竟MapReduce框架已经还算成熟。其他一些基于YARN框架的标准也在开发中。 YARN框架的核心是资源的管理和分配调度,它比Hadoop 1.x中的资源分配的粒度更细了,也更加灵活了,它的前景应该不错。由于极大地灵活性,所以在使用过程中由于这些配置的灵活性,可能使用的难度也加大了一些。另外,我个人觉得,YARN毕竟还在发展之中,也有很多不成熟的地方,各种问题频频出现,资料也相对较少,官方文档有时更新也不是很及时,如果我选择做海量数据处理,可能YARN还不能满足生产环境的需要。如果完全使用MapReduce来做计算,还是选择相对更加成熟的Hadoop 1.x版本用于生产环境。 下面使用4台机器,操作系统为CentOS 6.4 64位,一台做主节点,另外三台做从节点,实践集

使用libsvm实现文本分类

文本分类,首先它是分类问题,应该对应着分类过程的两个重要的步骤,一个是使用训练数据集训练分类器,另一个就是使用测试数据集来评价分类器的分类精度。然而,作为文本分类,它还具有文本这样的约束,所以对于文本来说,需要额外的处理过程,我们结合使用libsvm从宏观上总结一下,基于libsvm实现文本分类实现的基本过程,如下所示: 选择文本训练数据集和测试数据集:训练集和测试集都是类标签已知的; 训练集文本预处理:这里主要包括分词、去停用词、建立词袋模型(倒排表); 选择文本分类使用的特征向量(词向量):最终的目标是使得最终选出的特征向量在多个类别之间具有一定的类别区分度,可以使用相关有效的技术去实现特征向量的选择,由于分词后得到大量的词,通过选择降维技术能很好地减少计算量,还能维持分类的精度; 输出libsvm支持的量化的训练样本集文件:类别名称、特征向量中每个词元素分别到数字编号的映射转换,以及基于类别和特征向量来量化文本训练集,能够满足使用libsvm训练所需要的数据格式; 测试数据集预处理:同样包括分词(需要和训练