基于协同过滤的推荐方法

协同过滤(Collaborative Filtering, CF)是推荐系统广泛使用的一种技术,它主要通过考虑用户(User)与用户之间、物品(Item)与物品之间的相似度(Similarity),来向用户推荐物品,常被用在电商网站中。其中,在推荐系统中最常使用的协同过滤方法,有如下 4 种: 基于用户的协同过滤推荐 基于物品的协同过滤推荐 基于模型的协同过滤推荐 混合协同过滤推荐 上面 4 种方法中,基于用户的协同过滤推荐、基于物品的协同过滤推荐都是基于内存的协同过滤推荐,一般在数据量较小的应用场景下,可以直接在线使用的实时推荐方法;基于模型的协同过滤推荐一般用于离线计算,它采用机器学习的方法,一般首相将用户偏好行为数据分成 2 个数据集(有时可能会将数据集分成 k 个子集,采用交叉验证的方式来提高模型精度),一个为训练集,一个为测试集,使用训练集数据来训练出推荐模型,然后使用测试集数据来评估模型的精度,当满足特定精度时,可以将得到的推荐模型应用于实际线上环境;混合协同过滤推荐,是综合基于内存的协同过滤(基于用户的协同过滤推荐、基于物品的协

MapReduce V1:TaskTracker端启动Task流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。 TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker节点上准备运行这个Task。Task的运行是在一个与TaskTracker进程隔离的JVM实例中执行,该JVM实例是通过org.apache.hadoop.mapred.Child来创建的,所以在创建Child VM实例之前,需要做大量的准备工作来启动Task运行。一个Task的启动过程,如下序列图所示: 通过上图,结合源码,我们将一个Task启动的过程,分为下面3个主要的步骤: 初始化跟踪Task运行的相关数据结构 准备Task运行所共享的Job资源 启动Task 下面,我们详细分析上面3个步骤的流程: 初始化跟踪Task运行的相关数据结构 如果是LaunchTaskAction,则TaskTracker会将该指令加入到一个启动Task的队列中,进行一步加载处理,如下所示: private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action);

MapReduce V1:TaskTracker设计要点概要分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。 本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作。我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析。 TaskTracker主要负责MapReduce计算集群中Task运行的管理,所以TaskTracker要管理的事情比较多。一个MapReduce Job由很多的Task组成,而一个Job的所有Task被分成几个相斥的子集,每个子集被分配到某一个TaskTracker上去运行,所以一个TaskTracker管理运行了一个Job的所有Task的一个子集,也就是说TaskTracker不仅要维护每个Job对应的一个Task的子集,还要维护这些Task所属的Job的运行状态,对于Job/Task的状态的管理都是与JobTracker通过RPC通信保持状态的同步。 下面是TaskTracker端的主要组件,如下图所示: 为了了解TaskTracker中各个组件都负责处理哪些工作,我们通过下表来简要地说明各

ElasticSearch-2.0.0集群安装配置与API使用实践

ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。 ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下: 全文检索 提供插件机制,可以共享重用插件的功能 分布式文件存储 分布式实时索引和搜索 实时统计分析 可以横向扩展,支持大规模数据的搜索 简单易用的RESTful API 基于Replication实现了数据的高可用特性 与其他系统的集成 支持结构化和非结构化数据 灵活的Schema设计(Mappings) 支持多编程语言客户端 我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布

MapReduce V1:JobTracker处理Heartbeat流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。这篇文章的内容,更多地主要是描述处理/交互流程性的东西,大部分流程图都是经过我梳理后画出来的(开始我打算使用序列图来描述流程,但是发现很多流程在单个对象内部都已经非常复杂,想要通过序列图表达有点担心描述不清,所以选择最基本的程序流程图),可能看起来比较枯燥,重点还是关注主要的处理流程要点,特别的地方我会刻意标示出来,便于理解。 JobTracker与TaskTracker之间通过org.apache.hadoop.mapred.InterTrackerProtocol协议来进行通信,TaskTracker通过该接口进行远程调用实现Heartbeat消息的发送,协议方法定义如下所示: HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException; 通过该方法可以看出,最核心的Heartbeat报告数据都封装在Ta

MapReduce V1:JobTracker端Job/Task数据结构

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。 在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个Job是由一个或多个MapTask,以及0个或1个ReduceTask组成。而对于MapTask,它是根据输入的数据文件的的逻辑分片(InputSplit)而定的,通常有多少个分片就会有多少个MapTask;而对于ReduceTask,它会根据我们编写的MapReduce程序配置的个数来运行。 有了这些信息,我们能够预想到,在Job运行过程中,无非也需要维护与这些Job/Task相关的一些状态信息,通过一定的调度策略来管理Job/Task的运行。这里,我们主要关注JobTracker端的一些非常有用的数据结构:JobTracker、JobInProgress、TaskInProgress,来熟悉各种数据结构的定义及作用。 数据

MapReduce V1:Job提交流程之JobTracker端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。 上一篇我们分析了Job提交过程中JobClient端的处理流程(详见文章 MapReduce V1:Job提交流程之JobClient端分析),这里我们继续详细分析Job提交在JobTracker端的具体流程。通过阅读源码可以发现,这部分的处理逻辑还是有点复杂,经过梳理,更加细化清晰的流程,如下图所示: 上图中主要分为两大部分:一部分是JobClient基于RPC调用提交Job到JobTracker后,在JobTracker端触发TaskScheduler所注册的一系列Listener进行Job信息初始化;另一部分是JobTracker端监听Job队列的线程,监听到Job状态发生变更触发一系列Listener更新状态。我们从这两个方面展开分析: JobTracker接收Job提交 JobTracker接收到JobClient提交的Job,在JobTracker端具体执行流程,描述如下: JobClient基于JobSubmissionProtocol协议远程调用JobTracker的s

MapReduce V1:Job提交流程之JobClient端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。 MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。下图是《Hadoop权威指南》一书给出的MapReduce V1处理Job的抽象流程图: 如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程。 在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪些工作,或者说执行了哪些处理。在JobClient端提交Job的处理流程,如下图所示: 上图所描述的Job的提交流程,说明如下所示: 在MR程序中创建一个Job实例,设置Job状态 创建一个JobClient实例,准备将创建的Job实例提交到JobTracker 在创建JobClient的过程中,首先必须保证建立到JobTracker的RPC连接 基于JobSubmissionProtocol协议远程调用JobTracker获取一个新的Job ID 根据MR程序中配置的Job,在HDFS上创建Job相关目录,并将配置的tmpfiles、tmpja

Akka Cluster原理与应用

Akka集群原理 Akka集群支持去中心化的基于P2P的集群服务,没有单点故障(SPOF)问题,它主要是通过Gossip协议来实现。对于集群成员的状态,Akka提供了一种故障检测机制,能够自动发现出现故障而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其它成员节点。 状态转移与故障检测 Akka内部为集群成员定义了一组有限状态(6种状态),并给出了一个状态转移矩阵,代码如下所示: private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] = Map( Joining -> Set(Up, Down, Removed), Up -> Set(Leaving, Down, Removed), Leaving -> Set(Exiting, Down, Removed), Down -> Set(Removed), Exiting -> Set(Removed, Down), Removed -> Set.empty[MemberStatus]) } Akka集群中的每个成员节点,都有可能处于上面的一种状态,在发生某些事件以后,会发生状态转移。需要注意的是,除了Down和Removed状态以外,节点处于其它任何一个状态时都有可能变成Do

Akka入门编程实践

Akka是使用Scala语言开发一个编程库,基于事件驱动的架构实现异步处理,它能够简化编写分布式应用程序。Akka中最核心的概念是Actor模型,它为编写分布式/并行计算应用程序提供了高层次抽象,在实际编程实践中,开发人员可以从对复杂网络通信细节的处理、多线程应用场景下对锁的管理中解脱出来。 Akka能够给应用程序带来的几个重要的特性是: 容错性 可伸缩性 异步性 事件驱动架构(EDA) 远程透明性 Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和Actor打交道,那么Actor到底是怎样的一种抽象呢?一个Actor对象封装了状态和行为,但是它不和外界其它的Actor共享状态,如果一个Actor想要和另一个Actor交互,能且只能通过发送消息来达到信息交换的目的。可见,一个Actor能够很好地保护其内部状态的安全。 与本地Actor通信 下面,我们从最简单的Actor编程来体验Akka的功能。首先,先定义几种类型的消息,后面会基于这些消息来进行通信,代码如下所示: package org.shirdrn.scala.akka object Start extends Serializable

Akka框架基本要点介绍

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。本文基本上是基于Akka的官方文档(版本是2.3.12),通过自己的理解,来阐述Akka提供的一些组件或概念,另外总结了Akka的一些使用场景。 Actor 维基百科这样定义Actor模型: 在计算科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。 Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。 通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性: 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发 提供了异步非阻塞的、高性能的事件驱动编程模型 超

Apache Pig简介与实践

Apache Pig是一个用来分析大数据集的平台,它由两部分组成:一部分是用于表达数据分析程序的高级脚本语言,另一部分是用于评估分析程序的基本工具。目前来看,Pig主要用于离线数据的批量处理应用场景,但是随着Pig的发展处理数据的速度会不断地提升,这可能依赖于Pig底层的执行引擎。比如,Pig通过指定执行模式,可以使用Hadoop的MapReduce计算引擎来实现数据处理,也可以使用基于Tez的计算引擎来实现(Tez是为了绕开MapReduce多阶段Job写磁盘而设计的DAG计算引擎,性能应该比MapReduce要快),看到Pig未来的发展路线图,以后可能会基于Storm或Spark计算平台实现底层计算引擎,那样速度会有极大地提升。 我们基于最新的0.15.0版本的Pig(Hadoop使用的是2.2.0版本),通过编写一些例子脚本来实践Pig的语言特性。 Pig安装与执行 Pig安装非常简单,只需要下载Pig包,然后解压缩即可: wget http://mirror.bit.edu.cn/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz tar xvzf pig-0.15.0.tar.gz sudo ln -s /usr/local/pig-0.15.0 /usr/local/pig cd /usr/local/pig bi