MapReduce V1:JobTracker端Job/Task数据结构

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。
在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个Job是由一个或多个MapTask,以及0个或1个ReduceTask组成。而对于MapTask,它是根据输入的数据文件的的逻辑分片(InputSplit)而定的,通常有多少个分片就会有多少个MapTask;而对于ReduceTask,它会根据我们编写的MapReduce程序配置的个数来运行。
有了这些信息,我们能够预想到,在Job运行过程中,无非也需要维护与这些Job/Task相关的一些状态信息,通过一定的调度策略来管理Job/Task的运行。这里,我们主要关注JobTracker端的一些非常有用的数据结构:JobTracker、JobInProgress、TaskInProgress,来熟悉各种数据结构的定义及作用。

数据结构总体抽象

MapReduce框架就为了运行Job,所以我们基于Job的抽象来对JobTracker端的相关对象进行抽象,总体上理解它们之间的关系,如下图所示:
jobtracker-tip-taid
在JobTracker端,通过维护JobInProgress的信息来跟踪Job的运行生命周期,那么,JobTracker端肯定有一个用来维护所有Job状态的JobInProgress对象集合。而Job又是由Task组成,所以自然而然JobInProgress中应该有对Task运行状态的维护,Task的状态在JobTracker端通过TaskInProgress来抽象。一个Task可能运行失败,所以可能经过多次运行才能成功,而每一次运行会对应一个TaskAttempID,那么一个TaskInProgress又可能对应着多个TaskAttempID。

TaskInProgress数据结构

  • TaskAttemptID结构

一个TaskInProgress结构中包含了3个TaskAttemptID类型的数据,如下图所示:
tip-task-attempt-ids

  • TaskSplitMetaInfo结构

JobTracker会创建每一个Task需要运行Split的信息,包含了该Split所在的位置信息、起始偏移量、总输入字节数,结构图如下所示:
tip-task-split-meta-info

  • 其他结构
结构图 说明
tip-tasks
TreeSet<TaskAttemptID> tasks

一个TaskInProgress包含的TaskAttemptID的集合。
一个Task(MapTask/ReduceTask)可能包含多个TaskAttemptID在TaskTracker上运行,比如一个ReduceTask在TaskTracker上运行,同时可能存在一个推测执行的ReduceTask,他们对应了2个不同的TaskAttemptID。

tip-active-tasks
TreeMap<TaskAttemptID, String> activeTasks

维护了当前运行的Task,该Task运行在哪个TaskTracker上。

tip-cleanup-tasks
TreeMap<TaskAttemptID, String> cleanupTasks

记录了某个cleanup task在哪个TaskTracker上。

tip-tasks-reported-closed
TreeSet<TaskAttemptID> tasksReportedClosed

满足如下3种条件的Task会被加入到该数据结构中:

  • this.failed) || ((job.getStatus().getRunState() != JobStatus.RUNNING && (job.getStatus().getRunState() != JobStatus.PREP))
  • isComplete() && !(isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid))
  • isCommitPending(taskid) && !shouldCommit(taskid)

该数据结构用来辅助判断,是否一个Task已经完成(成功/失败),需要被TaskTracker终止掉,这个需要JobTracker发送KillTaskAction指令,通知TaskTracker终止该Task运行。

tip-task-to-kill
TreeMap<TaskAttemptID, Boolean> tasksToKill

记录了某个Task是否需要被Kill掉。

tip-machines-where-failed
TreeSet<String> machinesWhereFailed

记录了执行Task失败的TaskTracker的host信息。

JobInProgress数据结构

  • JobID结构

JobID的结构,如下图所示:
jip-job-id
上图中jtIdentifier的值为job,它是组成一个Job的ID字符串的前缀,唯一标识一个Job的完整ID的组成,如下所示:

job_<JobTracker启动时间字符串>_<序号>

例如,一个Job的ID字符串为job_200912121733_0002 。

  • JobProfile结构

JobProfile描述了一个Job的基本信息,它的结构,如下图所示:
jip-job-profile
通过上图可以看出,JobProfile包含了一个Job的如下信息:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
user String 提交的该Job的所属用户名称,例如:shirdrn
url String 在Web UI页面上查看该Job信息的链接,例如:http://jobtracker.hadoopcluster.com:8080/jobdetails.jsp?jobid=job_200912121733_0002
name String 提交Job的用户为该Job设置的名称字符串,例如:ChainUserEventsJob
jobFile String 该Job所对应的配置文件,例如:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml
queueName String 提交的该Job所在的队列的名称,例如:default
  • 组成Job的Task的信息

一个Job可能包含多个Task(MapTask/ReduceTask),每个Task在JobTracker端使用TaskInProgress结构来跟踪Task的信息,一个Job由下面4组结构来表示这些信息,如下图所示:
jip-tips
上图中出现了4种类型的Task,我们需要明白每种Task的作用是什么。一个Job在调度时,需要分解为上述4种类型的Task,基于类型来说明,一个Job对应的这4种类型的Task的运行顺序为:setup task、MapTask、ReduceTask、cleanup task,其中setup task和cleanup task运行也需要申请slot来运行,map setup运行需要占用Map Slot,而reduce setup运行需要占用Reduce Slot,对于cleanup task也是类似的。
这里说明一下cleanup task和setup task的作用。其实在JobTracker端来看,setup task、cleanup task都与MapTask、ReduceTask使用相同的TaskInProgress数据结构来维护状态。setup task主要是在一个Job开始运行之前,初始化一些状态信息,由于存在MapTask和ReduceTask两种计算型Task,那么对应就存在map setup task和reduce setup task两种setup task。cleanup task主要是在一个Job运行结束后,负责清理在TaskTracker上运行的Task生成的临时数据,更新TaskTracker端维护的相关对象的状态信息,等等,类似地也存在map cleanup task和reduce cleanup task两种cleanup task。

  • JobStatus结构

JobStatus结构定义一个Job的当前状态信息,如下图所示:
jip-job-status
除了定义Job运行状态信息,还包含了其他信息,如下表所示:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
mapProgress float MapTask运行进度百分比
reduceProgress float ReduceTask运行进度百分比
cleanupProgress float cleanup task运行进度百分比
setupProgress float setup task运行进度百分比
runState int Job运行状态:RUNNING = 1;SUCCEEDED = 2;FAILED = 3;PREP = 4;KILLED = 5;
user String 提交的该Job的所属用户名称,例如:shirdrn
priority JobPriority Job优先级信息,是一个枚举类型,包含如下优先级:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
schedulingInfo String Job调度信息
jobACLs Map 该Job设置的ACL(访问控制列表)列表信息
  • Counters结构

Counters包含了一组计数器,用来跟踪一个Job运行的信息,结构如下图所示:
jip-counters
每个Job都包含一组Counter计数器,如下表所示:

标识名称 类型
NUM_FAILED_MAPS 失败的MapTask数量
NUM_FAILED_REDUCES 失败的ReduceTask数量
TOTAL_LAUNCHED_MAPS 所有启动的 MapTask数量
TOTAL_LAUNCHED_REDUCES 所有启动的 ReduceTask数量
OTHER_LOCAL_MAPS 其他Local MapTask数量
DATA_LOCAL_MAPS DATA_LOCAL的MapTask数量
NODEGROUP_LOCAL_MAPS NODEGROUP_LOCAL的MapTask数量
RACK_LOCAL_MAPS RACK_LOCAL的MapTask数量
SLOTS_MILLIS_MAPS 被占用的Map slot的“Slot个数 * (结束时间 – 开始时间)”
SLOTS_MILLIS_REDUCES 被占用的Reduce slot:“Slot个数 * (结束时间 – 开始时间)”
FALLOW_SLOTS_MILLIS_MAPS 空闲Map Slot:“(当前时间 – 开始时间) * Slot个数”
FALLOW_SLOTS_MILLIS_REDUCES 空闲Reduce Slot:“(当前时间 – 开始时间) * Slot个数”
  • 其他结构

JobInProgress中使用了大量的集合来维护Job/Task相关的状态信息,具体内容如下表所示:

结构图 说明
jip-non-running-map-cache
Map<Node, List<TaskInProgress>> nonRunningMapCache

JobTracker端维护了某个Node上,没有运行的MapTask列表的信息。
在调度MapTask之前,需要计算某个MapTask将要运行在哪些Node上,这里维护了某个Node所对应的没有运行的MapTask的列表信息。

jip-running-map-cache
Map<Node, Set<TaskInProgress>> runningMapCache

JobTracker端维护了某个Node上,当前正在运行的MapTask列表的信息。

jip-non-local-maps
List<TaskInProgress> nonLocalMaps

JobTracker端维护的、非Local,并且还没有运行的MapTask的列表。

jip-failed-maps
SortedSet<TaskInProgress> failedMaps

JobTracker端维护了失败的MapTask的信息,在该集合中的TaskInProgress基于失败次数降序排序。
当某个MapTask失败以后,就会被放到该集合中,后续重新调度MapTask运行时,会检索该集合。

jip-non-local-running-maps
Set<TaskInProgress> nonLocalRunningMaps

JobTracker端维护的、 非Local、正在运行的MapTask保存在该数据结构中。

jip-non-running-reduces
Set<TaskInProgress> nonRunningReduces

JobTracker端维护的、没有运行的ReduceTask的列表。

jip-map-cleanup-tasks
List<TaskAttemptID> mapCleanupTasks

为MapTask运行的cleanup task列表。

jip-reduce-cleanup-tasks
List<TaskAttemptID> reduceCleanupTasks

为ReduceTask运行的cleanup task列表。

jip-task-completion-events
List<TaskCompletionEvent> taskCompletionEvents

TaskCompletionEvent是用来跟踪Task完成事件的数据结构,该列表结构保存了TaskCompletionEvent。
当一个Task的状态为TaskStatus.State.SUCCEEDED/TaskStatus.State.FAILED/TaskStatus.State.KILLED的时候,会创建一个对应的TaskCompletionEvent对象,根据该对象来更新JobTracker端维护的Task的状态信息。

jip-map-task-id-to-fetch-failures-maps
Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap

TaskTracker发送心跳的时候,会将TaskTracker的状态信息发送给JobTracker,状态信息通过TaskTrackerStatus表示,该对象中包含了Task的报告信息TaskStatus。如果是在运行ReduceTask时,抓取MapTask输出的结果失败时,会在根据Task报告信息,更新JobTracker端维护的mapTaskIdToFetchFailuresMap,记录了Task抓取MapTask输出失败的次数计数信息。

jip-first-task-launch-times
Map<TaskType, Long> firstTaskLaunchTimes

TaskType枚举类型定义如下:

				public enum TaskType {
				  MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP
				}
				

该firstTaskLaunchTimes数据结构保存了某个TaskType类型第一次运行的时间戳信息。

jip-trackers-reserved-for-maps
Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps

FallowSlotInfo包含了空闲的slot信息,主要九个包含了空闲的slot的个数信息。
该数据结构维护了在某个TaskTracker上为MapTask运行所预留的空闲slot的信息。

jip-trackers-reserved-for-reduces
Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces

该数据结构维护了在某个TaskTracker上为ReduceTask运行所预留的空闲slot的信息。

JobTracker数据结构

JobTracker通过在内存中维护有关Job、Task相关的所有信息,来跟踪他们运行、交互过程中所发生的数据交换,等等,如下表所示:

结构图 说明
jt-service-plugins
List<ServicePlugin> plugins

通过ServicePlugin接口,可以基于任意的RPC协议暴露DataNode或NameNode的功能。
通过配置项mapreduce.jobtracker.plugins可以设置ServicePlugin,JobTracker启动的时候会加载初始化配置的ServicePlugin。

jt-job-in-progress-listeners
List<JobInProgressListener> jobInProgressListeners

JobTracker维护了一组JobInProgressListener监听器,在JobTracker运行过程中,发生某些事件会触发注册的JobInProgressListener的执行。比如,JobClient提交一个Job,JobTracker端会触发对应的JobInProgressListener调用jobAdded()初始化该Job;比如,Job执行过程中状态发生变更,会触发JobInProgressListener调用jobUpdated()执行;比如,Job运行完成,会触发obInProgressListener调用jobRemoved()执行。
JobTracker初始化时会创建TaskScheduler,而启动TaskScheduler的时候,会把TaskScheduler所维护的JobInProgressListener添加到jobInProgressListeners列表中。

jt-jobs
Map<JobID, JobInProgress> jobs

JobTracker维护一个JobID->JobInProgress映射的列表,JobID标识一个提交的Job,JobInProgress是JobTracker端维护的Job的所有信息的数据结构。在如下情况下,会检索/操作该jobs数据结构:

  • JobClient提交Job的时候,会创建JobInProgress,并加入到jobs集合中
  • JobClient远程调用Kill掉指定Job的时候,会根据JobID从jobs中获取JobInProgress信息,并Kill掉该Job,更新状态信息
  • JobClient查询当前运行的所有Job信息时,会检索jobs列表
  • 在JobTracker端检索一个Job所维护的Task信息时,会根据JobInProgress所维护的数据结构获取到对应的Task的信息TaskInProgress
  • Job运行状态不为RUNNING,并且也不为PREP,并且完成时间早于当前时间,会将Job从jobs列表删除
  • JobTracker解析接收到的TaskTracker发送的心跳的过程中,会检索并更新jobs列表中的Job信息,找到可以分配给该TaskTracker的属于满足条件的Job所包含的Task
jt-user-to-jobs-map
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap

用来跟踪某个用户提交的需要运行的Job集合的数据结构。
当Job完成(success/failure/killed)后,会在JobTracker内存中保存一些Job,这些Job属于哪些用户的。默认情况下会保存MAX_COMPLETE_USER_JOBS_IN_MEMORY=100个用户的已完成的Job,当超过该值时,会清理掉最早的用户以及对应的完成的Job信息。
可以通过配置项mapred.jobtracker.completeuserjobs.maximum来设置该值。

jt-tracker-to-jobs-to-cleanup
Map<String, Set<JobID>> trackerToJobsToCleanup

用来跟踪某个TaskTracker上运行的Job集合的数据结构。
当一个Job已经运行完成,TaskTracker需要知道哪些运行在该节点上的Job已经完成,并等待通知进行清理,这时会在JobTracker端检索该Map,取出该TaskTracker对应的需要进行清理的Job的集合。
另外,还有一种情况,当JobTracker一段时间内没有收到TaskTracker发送的心跳报告,这时会将该TaskTracker对应的Job集合从trackerToJobsToCleanup中删除,后续会重新调度这些运行在该有问题的TaskTracker上的Task(这些Task属于某些Job,JobTracker分配任务的单位是Task)。

jt-tracker-to-tasks-to-cleanup
Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup

用来跟踪某个TaskTracker上运行的Task集合的数据结构。
当Job运行完成(成功或者失败)后,一个TaskTracker需要知道属于该Job的哪些Task运行在该TaskTracker上,需要对这些Task进行清理。JobTracker端会查询出这类Task,并通过心跳的响应,向对应的TaskTracker发送KillTaskAction指令,通知TaskTracker清理这些Task运行时生成的临时文件等。

jt-task-id-to-tip-map
Map<TaskAttemptID, TaskInProgress> taskidToTIPMap

TaskAttemptID用来标识一个MapTask或一个ReduceTask,通过该数据结构可以根据TaskAttemptID获取到MapTask/ReduceTask的运行信息,也就是TaskInProgress对象。
当需要检索MapTask/ReduceTask,或者对JobTracker端所维护的该Task的状态信息进行更新的时候,需要通过该数据结构获取到。

jt-task-id-to-tracker-map
TreeMap<TaskAttemptID, String> taskidToTrackerMap

维护TaskAttemptID到TaskTracker的映射关系,可以通过一个Task的ID获取到该Task运行在哪个TaskTracker上。

jt-host-name-to-task-tracker
Map<String, Set<TaskTracker>> hostnameToTaskTracker

一台主机上,可能运行着多个TaskTracker进程,该数据结构用来维护host到TaskTracker集合的映射关系。如果一个host被加入了黑名单,则该host上面的所有TaskTracker都无法接收任务。

jt-tracker-to-task-map
TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap

某个TaskTracker上都运行着哪些Task,通过该数据结构来维护这种映射关系。

jt-tracker-to-marked-tasks-map
TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap

在某个TaskTracker上都运行完成了哪些Task,通过该数据结构来维护这种映射关系。

jt-tracker-to-heartbeat-response-map
Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap

TaskTracker会周期性地向JobTracker发送心跳报告,最近一次发送的心跳报告,JobTracker会给其一个响应,最后的这个响应的数据保存在该数据结构中。

jt-host-name-to-node-map
Map<String, Node> hostnameToNodeMap

JobTracker维护了一个网络拓扑结构(NetworkTopology),组成该拓扑结构的是一个一个的Node,每个Node都包含了网络位置信息、继承关系信息、名称等。
每个TaskTracker都是整个Hadoop集群的一个节点,通过该数据结构维护了TaskTracker在集群拓扑结构中相关信息。
比如,根据给定TaskTracker ID,从hostnameToNodeMap中检索出其对应的Node信息,在调度一个Job的MapTask运行时(MapTask运行具有Locality特性),可以基于local、rack-local、off-switch的顺序优先选择前面的Node运行该MapTask。

附录
这里给出文中(文字/图片上)一些缩写词对应的完整名称,如下表所示:

简写词 完整名称
JIP JobInProgress
TIP TaskInProgress
TAID TaskAttemptID
TTID TaskTracker ID
TT HOST TaskTracker Host Name
TCE TaskCompletionEvent
JIPL JobInProgressListener
Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(2): “MapReduce V1:JobTracker端Job/Task数据结构

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>