MapReduce V1:TaskTracker设计要点概要分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作。我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析。
TaskTracker主要负责MapReduce计算集群中Task运行的管理,所以TaskTracker要管理的事情比较多。一个MapReduce Job由很多的Task组成,而一个Job的所有Task被分成几个相斥的子集,每个子集被分配到某一个TaskTracker上去运行,所以一个TaskTracker管理运行了一个Job的所有Task的一个子集,也就是说TaskTracker不仅要维护每个Job对应的一个Task的子集,还要维护这些Task所属的Job的运行状态,对于Job/Task的状态的管理都是与JobTracker通过RPC通信保持状态的同步。
下面是TaskTracker端的主要组件,如下图所示:
TaskTracker
为了了解TaskTracker中各个组件都负责处理哪些工作,我们通过下表来简要地说明各个组件的功能,如下表所示:

组件名称 组件功能
localFs: FileSystem TaskTracker本地文件系统,用来管理本地文件和目录
systemFS: FileSystem HDFS分布式文件系统,可以访问HDFS,用来检索Job/Task对应的资源文件等。
TrackerDistributedCacheManager TrackerDistributedCacheManager负责跨Job的缓存的管理,每个Job会对应一个TaskDistributedCacheManager实例。比如,每次TaskTracker被分配执行一个Job的一组Task,此时需要将该Job对应的资源文件和split相关数据从HDFS下载到TaskTracker本地,这些文件都需要进行管理,包括位置查询、文件访问、文件清理等。
TaskTrackerInstrumentation 用来管理TaskTracker上运行的一些Task的监控数据,主要是采集某些点的数据,如Task完成时、Task失败时、Task超时时等,目前该组件中都是空实现。
IndexCache Map阶段需要输出临时文件,要对MapTask的输出写入TaskTracker本地文件系统,需要对这些输出数据进行分区(partition),IndexCache负责管理分区文件的相关信息。
UserLogManager 负责管理TaskTracker节点上执行Task输出的日志信息,目前通过UserLogEvent定义了JVM_FINISHED、JOB_STARTED,、JOB_COMPLETED,、DELETE_JOB这4种事件,通过UserLogManager可以实现日志记录的输出。
ACLsManager 用来控制MapReduce管理员管理Job和Queue级别操作的访问权限。
NodeHealthCheckerService 用来检测节点之间的心跳服务。
ResourceCalculatorPlugin 用来计算系统的资源的插件,默认使用的是LinuxResourceCalculatorPlugin实现,可以方便地访问系统中的资源信息状态,如内存、CPU。
JvmManager 为了保证TaskTracker与实际Task(MapTask/ReduceTask)运行的隔离性,会将Task在单独的JVM实例中运行,JvmManager用来管理Task运行所在的JVM实例的信息,包括创建/销毁JVM实例等操作。
LocalStorage 管理TaskTracker本地文件系统的存储目录信息,如访问本地目录失败、检测目录可用性等。
LocalDirAllocator 管理TaskTracker本地目录分配,初始化LocalDirAllocator基于配置mapred.local.dir指定的目录,它采用的Round-Robin方式,在Task运行之前需要写一个启动Task的脚本文件,使用LocalDirAllocator来控制对应文件的读写。
JettyBugMonitor 在Map阶段输出中间结果,Reduce阶段会基于HTTP协议(基于Jetty)来拷贝属于自己的分区,为了解决Jetty已知的一些类存在的Bug,它们可能会影响TaskTracker,通过检测Jetty所在JVM实例使用CPU量,当超过配置的值时终止TaskTracker进程。
MapOutputServlet TaskTracker上启动一个Jetty容器,该Servlet用来负责暴露HTTP接口,供其它运行ReduceTask的TaskTracker拉取Map输出文件。
jobClient: InterTrackerProtocol 与JobTracker进行RPC通信的代理(Proxy)对象。
taskReportServer: Server TaskTracker节点上启动的RPC Server,在其上运行的Task,在运行过程中会向TaskTracker汇报状态,使TaskTracker知道Task的运行状态报告。
CleanupQueue 负责清理Job或Task运行完成后遗留下的一些不再使用的文件或目录。
TaskTrackerStatus 维护TaskTracker当前的状态信息,主要包括:TaskTracker的配置信息、TaskTracker上资源状态信息、TaskTracker上运行的Task的状态报告信息。
JobTokenSecretManager 用来管理Job运行的令牌相关信息。
ShuffleServerInstrumentation 管理Job运行过程中,shuffle阶段的监控数据,包括一组计数器:serverHandlerBusy、outputBytes、failedOutputs、successOutputs、exceptionsCaught。
TaskController 用来管理Task的初始化、完成、清理工作,还负责启动和终止Task运行所在的JVM实例。
HttpServer 用来处理Map输出的Jetty容器,其中MapOutputServlet会注册到该HTTP server中。
ShuffleExceptionTracker 跟踪Shuffle阶段出现异常情况的信息。
MapEventsFetcherThread 跟踪每个运行的Job对应的ReduceTask的Shuffle阶段,如果有Map完成,会对应着TaskCompletionEvent触发该线程,从已经完成的Map所在节点拷贝Map输出的中间结果数据,为ReduceTask运行做准备。
ReduceTaskLauncher 启动ReduceTask。
MapTaskLauncher 启动MapTask。
TaskCleanupThread 负责清理Job/Task执行完成后遗留的文件或目录。
TaskMemoryManagerThread 管理在该TaskTracker上运行的Task使用内存的信息。

通过上表,我们可以了解到TaskTracker端各个组件的基本功能,也稍微了解到组件之间的一些关系。下面,我们从TaskTracker抽象层次的视角,来分析组件之间的关系和交互,概要地描述一些主要的处理流程:

  • TaskTracker处理心跳响应
  • MapReduce Job恢复运行
  • Task隔离运行
  • 启动MapTask过程
  • 启动ReduceTask过程

下面,我们分别分析上述列举的5个处理流程:

TaskTracker处理心跳响应

TaskTracker周期性地向JobTracker发送心跳报告,将TaskTracker上运行的Task的状态信息、节点资源信息、节点健康状况信息封装到TaskTrackerStatus对象中,通过RPC调用heartbeat将心跳发送到JobTracker端,并返回HeartbeatResponse,其中心跳响应对象中包包含了JobTracker分配的任务,通过TaskAction这种指令(包括:LaunchTaskAction/KillTaskAction/CommitTaskAction)列表的方式进行指派。TaskTracker解析RPC调用返回的心跳响应,根据TaskAction指令列表,执行具体的操作。
TaskTracker处理心跳响应的流程,如下序列图所示:
TT.processHeartbeatResponse
TaskTracker收到心跳响应,首先会检查是否存在需要恢复的Job,如果存在,则会检查要进行恢复的Job的状态,从而将需要进行恢复的Job对应的Task加入到恢复队列中,等待调度运行。
接着,TAskTracker会检查TaskAction指令的类型,根据其实际类型,执行对应的处理流程:

  • 如果是LaunchTaskAction,则启动Task
  • 如果是KillTaskAction,则杀掉Task,修改TaskTracker维护的Job及Task状态,并清理临时数据
  • 如果是KillJobAction,则杀掉该Job,说明该Job已经完成(成功/失败),修改TaskTracker维护的Job及Task状态,并清理临时数据
  • 如果是CommitTaskAction,则说明对应的该Task已经执行完成,修改TaskTracker维护的Task即Job状态,最后还要清理临时数据

由于某些重要的处理流程,如启动一个Task的详细流程,我们会在后续单独写几篇文章,用更加合适的方式来详细分析。

MapReduce Job恢复运行

这里,我们介绍一下MapReduce计算中是如何实现Job的恢复的,包括JobTracker端和TaskTracker端之间的简单交互流程。
JobTracker存在一个系统目录(System Directory),默认值为/tmp/hadoop/mapred/system,也可以根据配置项mapred.system.dir指定该值。当JobClient提交一个Job到JobTracker时,JobTracker会首先将该Job的信息写入到JobTracker的系统目录下,每个Job对应一个以Job ID为名称的子目录,以便JobTracker因为重启,能够恢复这些Job的运行。我们可以看一下JobTracker中submitJob方法中保存Job信息的实现,代码如下所示:

    // Store the job-info in a file so that the job can be recovered
    // later (if at all)
    // Note: jobDir & jobInfo are owned by JT user since we are using
    // his fs object
    if (!recovered) {
      Path jobDir = getSystemDirectoryForJob(jobId); // Job目录,例如:/tmp/hadoop/mapred/system/job_200912121733_0002
      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); // Job文件,例如:/tmp/hadoop/mapred/system/job_200912121733_0002/job-info
      jobInfo.write(out); // 将JobInfo结构对应的数据写入到Job文件
      out.close();
    }

上面代码中,JobInfo主要包含了JobID信息、用户名称、Job提交目录(例如,/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/ ,该目录是在JobClient提交Job时在HDFS上创建的,用于将该Job所需要的资源都拷贝到该Job对应的提交目录下面,便于后续JobTracker能够读取这些数据)。
如果JobTracker因为某些原因重新启动了,那么在JobTracker重启之后,需要从JobTracker的系统目中读取这些Job的信息,以便能够恢复这些尚未完成的Job的运行,并以HeartbeatResponse的结构,在TaskTracker发送Heartbeat的时候响应给TaskTracker,TaskTracker解析响应数据,然后去恢复这些Job的运行。
上面的序列图中,我们可以看到,当TaskTracker发送Heartbeat并收到响应后,从HeartbeatResponse中解析取出需要Recovered的Job,并进行处理,代码如下所示:

        // Check if the map-event list needs purging
        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); // 获取到需要Recovered的Job列表
        if (jobs.size() > 0) {
          synchronized (this) {
            // purge the local map events list
            for (JobID job : jobs) {
              RunningJob rjob;
              synchronized (runningJobs) { // TaskTracker维护的当前在其上运行的Job列表
                rjob = runningJobs.get(job);         
                if (rjob != null) {
                  synchronized (rjob) {
                    FetchStatus f = rjob.getFetchStatus(); 
                    if (f != null) {
                      f.reset();
                    }
                  }
                }
              }
            }

            // Mark the reducers in shuffle for rollback
            synchronized (shouldReset) {
              for (Map.Entry<TaskAttemptID, TaskInProgress> entry
                   : runningTasks.entrySet()) {
                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
                  this.shouldReset.add(entry.getKey()); // 将处于SHUFFLE阶段的Task放到shouldReset集合中
                }
              }
            }
          }
        }

通过上面的代码我们可以看到,当JobTracker重启的时候,已经在TaskTracker上运行的属于某些Job的Task可能无法立即感知到,对应的Job仍然存在于TaskTracker的runningJobs集合中。在JobTracker重启之后,TaskTracker所发送的第一个Heartbeat返回的响应数据中,应该会存在需要Recovered的Job列表,所以这时在TaskTracker端只需要从runningJobs中取出需要Recovered的Job,并查看其是否存在Fetch状态,如果存在,应该重新设置状态(主要对应于MapEventsFetcherThread 维护的TaskCompletionEvent列表,触发ReduceTask拉取MapTask的输出中间结果),以便该Job的各个Task恢复运行。如果该ReduceTask正在运行于SHUFFLE阶段,需要将对应的Job的MapTask的输出拷贝到该ReduceTask所在的节点上,通过调用FetchStatus的reset方法重置状态,这样就重新恢复了ReduceTask的运行。

Task隔离运行

由于MapReduce用户程序包含用户代码,可能会存在Bug,为了不因为用户代码存在的Bug影响TaskTracker服务,所以MapReduce采用了隔离Task运行的方式来运行MapTask/ReduceTask。在运行Task时,会单独创建一个独立的JVM实例,让Task的代码再该JVM实例中加载运行,TaskTracker需要跟踪该JVM实例中运行的Task的状态。在TaskTracker端,加载一个运行Task的JVM实例,是通过org.apache.hadoop.mapred.Child类来实现。下面,我们看一下Child类是如何实现Task加载运行的,如下面序列图所示:
TT.Child.main
Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过上面序列图我们可以看出,命令行输入如下5个参数:

  • host:表示TaskTracker节点的主机名称
  • port:表示TaskTracker节点RPc端口号
  • taskID:表示启动的Task对应的TaskAttemptID,标识一个Task的一个运行实例
  • log location:表示该Task运行实例对应的日志文件的路径
  • JVM ID:表示该Task实例对应的JVMId信息,包括JobID、Task类型(MapTask/ReduceTask)、JVM编号(标识该JVM实例对应的id)

有了上述参数,就可以获取到一个Task运行所需要的全部资源,如一个Task处理哪一个Split,一个Task对应的Job配置信息,还可以方便TaskTracker监控该Task实例所在的JVM的状态。该Child创建时,会创建一个到TaskTracker的RPC代理对象,通过该RPC连接向TaskTracker汇报Task执行进度及其状态信息。然后,一切运行Task的基本条件都已经具备,接下来从该Task对应的Job的代码(job.jar)开始加载任务处理类,如果是MapTask则执行MapTask运行的处理流程,如果是ReduceTask则执行ReduceTask的处理流程,最后,断开Task汇报状态的RPc连接,Task运行结束。

启动MapTask过程

在Child类中加载启动Task,如果是MapTask,则执行MapTask对应的处理流程,如下序列图所示:
TT.MapTask.run
启动一个MapTask运行,包含4个阶段,我们通过运行各个阶段的方法来表示:

  • runJobCleanupTask():清理Job对应的相关目录和文件
  • runJobSetupTask():创建Job运行所需要的相关目录和文件
  • runTaskCleanupTask():清理一个Task对应的工作目录下与Task相关的目录或文件
  • runNewMapper()/runOldMapper():调用用户编写的MapReduce程序中的Mapper中的处理逻辑

在MapTask运行过程中, 如果阶段或者状态发生变化,要与TaskTracker进行通信,汇报状态,并更新TaskTracker维护的关于Task和Job对应的状态数据。最后,Task运行完成,也要通知TaskTracker。

启动ReduceTask过程

在Child类中加载启动Task,如果是ReduceTask,则执行ReduceTask对应的处理流程,如下序列图所示:
TT.ReduceTask.run
启动一个ReduceTask运行,与MapTask的处理流程有很大的不同,它包含3个阶段,如下所示:

  • copy阶段:从运行MapTask的TaskTracker节点,拷贝属于该ReduceTask对应的Job所包含的MapTask的输出中间结果数据,这些数据存储在该Reduce所在TaskTracker的本地文件系统(可能会放在内存中),为后续阶段准备数据
  • sort阶段:对从MapTask拉取过来的数据进行合并、排序
  • reduce阶段:调用用户编写的MapReduce程序中的Reducer中的处理逻辑

执行reduce阶段,比MapTask复杂的多。在ReduceTask运行过程中,也会周期性地与TaskTracker通信,汇报Task运行进度和状态,以保证与TaskTracker所维护的Task的状态数据同步。当ReduceTask完成后,如果有输出的话,最终的结果数据会输出到HDFS中保存。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>