MapReduce V1:MapTask执行流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
在文章《MapReduce V1:TaskTracker设计要点概要分析》中我们已经了解了org.apache.hadoop.mapred.Child启动的基本流程,在Child VM启动的过程中会运行MapTask,实际是运行用户编写的MapReduce程序中的map方法中的处理逻辑,我们首先看一下,在Child类中,Child基于TaskUmbilicalProtocol协议与TaskTracker通信,获取到该Child VM需要加载的Task相关数据,包括Task本身,代码如下所示:

    final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
        @Override
        public TaskUmbilicalProtocol run() throws Exception { // 建立Child到TaskTracker的RPC连接
          return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
              TaskUmbilicalProtocol.versionID,
              address,
              defaultConf);
        }
    });
... ...
    JvmContext context = new JvmContext(jvmId, pid); // 根据启动Child VM命令行传递的参数构造一个JvmContext对象
... ...
        JvmTask myTask = umbilical.getTask(context); // 基于umbilical获取到一个JvmTask
... ...
        task = myTask.getTask(); // 通过JvmTask获取到MapTask或ReduceTask

上面代码中,JvmTask中就包含了一个Task,也就是task,它可能是MapTask或ReduceTask。
看一下在org.apache.hadoop.mapred.Child中运行Task的基本代码,如下所示:

        // Create a final reference to the task for the doAs block
        final Task taskFinal = task;
        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            try {
              // use job-specified working directory
              FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
              taskFinal.run(job, umbilical);        // 这行是核心代码,运行实际的MapTask或ReduceTask
            } finally {
              TaskLog.syncLogs
                (logLocation, taskid, isCleanup, logIsSegmented(job));
              TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
              trunc.truncateLogs(new JVMInfo(
                  TaskLog.getAttemptDir(taskFinal.getTaskID(),
                    taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
            }

            return null;
          }
        });

我们关注执行MapTask,上面,通过调用MapTask的run方法,来实际启动MapTask的运行。

MapTask整体执行流程

MapTask运行的整体流程,如下图所示:
TT.MapTask.run
上面流程比较直观,我们结合MapTask的run方法的代码,来进行分析,代码如下所示:

  @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    // start thread that will handle communication with parent
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); // 创建TaskReporter对象
    reporter.startCommunicationThread();
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi); // 初始化MapTask

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter); // 运行JobCleanupTask
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter); // 运行JobSetupTask
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter); // 运行TaskCleanupTask
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter); // 运行MapTask
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter); // Task运行完成
  }

上面代码中,run方法的参数TaskUmbilicalProtocol umbilical表示一个RPC代理对象,通过该对象可以与TaskTracker进行通信,从而在Task运行过程中,能够将Task的运行进度、状态信息汇报给TaskTracker。
通过上面看出,启动一个MapTask,可能运行的是JobCleanupTask、JobSetupTask、TaskCleanupTask、Mapper四种Task之中的一种,运行每种Task都会向TaskTracker进行汇报。关于一个MapTask如何被划分的,可以参考JobTracker端在JobInProgress中initTasks()方法。
下面,我们根据MapTask中run方法中的处理流程,分为如下几个子流程,进行详细分析:

初始化Task分析

这里,主要分析在MapTask的run方法中,调用initialize方法的初始化逻辑。先看在调用initialize方法之前,首先创建了一个TaskReporter线程对象,该对象又是基于TaskUmbilicalProtocol umbilical来实现将Task运行状态汇报给TaskTracker。在initialize方法中的初始化流程如下所示:

  1. 根据JobConf job与JobID id,以及TaskReporter,创建一个JobContext对象
  2. 创建一个TaskAttemptContext对象
  3. 如果Task状态为TaskStatus.State.UNASSIGNED,修改为TaskStatus.State.RUNNING
  4. 根据JobConf创建OutputFormat,以及OutputCommitter
  5. 为该Task创建Job的输出目录等内容
  6. 初始化ResourceCalculatorPlugin,用来计算Task运行过程中对节点资源的使用情况

运行JobCleanupTask

执行JobCleanupTask,主要是清理Job运行过程中产生的数据,因为该Task可能上次运行过一次,但是失败,为了下一次重新运行,需要将之前失败Task的数据清理掉。即使Job运行成功,也需要清理MapTask执行后输出的中间结果数据,具体流程,如下图所示:
TT.Task.runJobCleanupTask
上面的序列图比较详细,将Task运行过程中与TaskTracker之间进行通信都描述出来,我们总结如下几个要点:

  • 运行JobCleanupTask首先将当前Task运行阶段设置为CLEANUP,并向TaskTracker汇报状态变更
  • 如果Job的状态是JobStatus.State.FAILED,则删除在该节点上运行Task所产生的临时数据
  • 如果Job的状态是JobStatus.State.KILLED,同样删除临时数据,并在该Job对应的目录下创建_SUCCESS文件,标识Job成功
  • 最后,向TaskTracker汇报状态,更新相关TIP数据结构状态,并释放锁占用的资源,以供其他Task运行所使用

运行JobSetupTask

运行JobSetupTask,主要是初始化Job对应的临时目录,为后续运行Task做准备,具体处理流程,如下图所示:
TT.Task.runJobSetupTask
在创建运行Job的基本临时目录以后,也需要与TaskTracker通信,汇报该Task的运行状态。

运行TaskCleanupTask

TaskCleanupTask与JobCleanupTask类似,主要是清理Task运行过程中产生的一些临时目录和文件,具体流程如下图所示:
TT.Task.runTaskCleanupTask

运行MapTask

MapTask执行的核心逻辑在runNewMapper方法中,该方法中对应的处理流程,如下图所示:
TT.MapTask.runNewMapper
方法runNewMapper的声明,如下所示:

  private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException

其中,JobConf job包含了该Job的配置信息,TaskSplitIndex splitIndex包含了该MapTask所处理的InputSplit的信息(包括splitLocation和startOffset),TaskUmbilicalProtocol umbilical是与TaskTracker通信的代理对象,TaskReporter reporter是一个与TaskTracker通信的线程。
上面的序列图,所描述的具体处理流程,如下所示:

  1. 创建TaskAttemptContext对象
  2. 通过反射,创建Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>对象
  3. 通过反射,创建InputFormat<INKEY,INVALUE>对象
  4. 根据TaskSplitIndex splitIndex ,创建InputSplit对象
  5. 创建RecordReader<INKEY,INVALUE>对象,用来读取输入的InputSplit对应的文件
  6. 创建RecordWriter<K, V>对象,用来将处理后的数据写入文件系统
  7. 创建Mapper.Context对象,可以在编写MapReduce程序中,实现Mapper时使用
  8. 初始化RecordReader<INKEY,INVALUE>对象
  9. 执行Mapper的run方法,调用用户编写的MapReduce程序的Mapper中的处理逻辑,内部循环调用map方法
  10. 回收资源,关闭相关的流对象

下面,我们详细看一下,Mapper处理过程中相关的要点:

  • Mapper.Context结构

Mapper.Context是Mapper类的一个内部类,它包含了运行一个MapTask过程中所需要的所有上下文信息,该类的继承层次结构,如下图所示:
Mapper.Context
可以看出,一个Mapper对应的执行上下文信息,继承了该Mapper对应TaskAttempt的上下文信息,再向上继承了Job的上下文信息,一个Job包含的配置信息都可以被一个Mapper读取到。
MapperContext中包含如下信息:

  private RecordReader<KEYIN,VALUEIN> reader;
  private InputSplit split;

通过使用一个RecordReader,能够读取InputSplit对应的HDFS上的Block文件数据。
TaskInputOutputContext中包含如下信息:

  private RecordWriter<KEYOUT,VALUEOUT> output;
  private StatusReporter reporter;
  private OutputCommitter committer;

可见,在该层能够实现将Task运行的统计,通过StatusReporter以Counter的形式收集,并提供进度(Progress)获取接口。同时,使用RecordWriter将Mapper的输出写入到文件系统。
OutputCommitter是一个比较重要的对象,该抽象类代码,如下所示:

public abstract class OutputCommitter {

  public abstract void setupJob(JobContext jobContext) throws IOException;

  public void commitJob(JobContext jobContext) throws IOException {
    cleanupJob(jobContext);
  }

  @Deprecated
  public void cleanupJob(JobContext context) throws IOException { }

  public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
    cleanupJob(jobContext);
  }

  public abstract void setupTask(TaskAttemptContext taskContext) throws IOException;

  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException;

  public abstract void commitTask(TaskAttemptContext taskContext) throws IOException;

  public abstract void abortTask(TaskAttemptContext taskContext) throws IOException;
}

可以参考实现类FileOutputCommitter,它主要负责,在Job运行过程中,管理Task执行过程中对应的文件或目录的信息,如开始运行之前创建目录,运行完成后将临时有用的文件移动到供Job共享的目录下,也