HDFS 写文件过程分析

HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示:
hdfs-write-flow
具体过程描述如下:

  1. Client 调用 DistributedFileSystem 对象的 create 方法,创建一个文件输出流(FSDataOutputStream)对象
  2. 通过 DistributedFileSystem 对象与 Hadoop 集群的 NameNode 进行一次 RPC 远程调用,在 HDFS 的 Namespace 中创建一个文件条目(Entry),该条目没有任何的 Block
  3. 通过 FSDataOutputStream 对象,向 DataNode 写入数据,数据首先被写入 FSDataOutputStream 对象内部的 Buffer 中,然后数据被分割成一个个 Packet 数据包
  4. 以 Packet 最小单位,基于 Socket 连接发送到按特定算法选择的 HDFS 集群中一组 DataNode(正常是 3 个,可能大于等于 1)中的一个节点上,在这组 DataNode 组成的 Pipeline 上依次传输 Packet
  5. 这组 DataNode 组成的 Pipeline 反方向上,发送 ack,最终由 Pipeline 中第一个 DataNode 节点将 Pipeline ack 发送给 Client
  6. 完成向文件写入数据,Client 在文件输出流(FSDataOutputStream)对象上调用 close 方法,关闭流
  7. 调用 DistributedFileSystem 对象的 complete 方法,通知 NameNode 文件写入成功

下面代码使用 Hadoop 的 API 来实现向 HDFS 的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static String[] contents = new String[] {
     "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
     "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
     "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
     "dddddddddddddddddddddddddddddddd",
     "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};
 
public static void main(String[] args) {
     String file = "hdfs://h1:8020/data/test/test.log";
   Path path = new Path(file);
   Configuration conf = new Configuration();
   FileSystem fs = null;
   FSDataOutputStream output = null;
   try {
          fs = path.getFileSystem(conf);
          output = fs.create(path); // 创建文件
          for(String line : contents) { // 写入数据
               output.write(line.getBytes("UTF-8"));
               output.flush();
          }
     } catch (IOException e) {
          e.printStackTrace();
     } finally {
          try {
               output.close();
          } catch (IOException e) {
               e.printStackTrace();
          }
     }
}

结合上面的示例代码,我们先从 fs.create(path); 开始,可以看到 FileSystem 的实现 DistributedFileSystem 中给出了最终返回 FSDataOutputStream 对象的抽象逻辑,代码如下所示:

1
2
3
4
5
6
7
8
9
public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite,
  int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {
 
  statistics.incrementWriteOps(1);
  return new FSDataOutputStream
     (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}

上面,DFSClient dfs 的 create() 方法中创建了一个 OutputStream 对象,在 DFSClient 的 create() 方法:

1
2
3
4
5
6
7
8
9
10
11
  public OutputStream create(String src,
                             FsPermission permission,
                             boolean overwrite,
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
   ... ...
}

创建了一个 DFSOutputStream 对象,如下所示:

1
2
3
final DFSOutputStream result = new DFSOutputStream(src, masked,
    overwrite, createParent, replication, blockSize, progress, buffersize,
    conf.getInt("io.bytes.per.checksum", 512));

下面,我们从 DFSOutputStream 类开始,说明其内部实现原理。

DFSOutputStream 内部原理

打开一个 DFSOutputStream 流,Client 会写数据到流内部的一个缓冲区中,然后数据被分解成多个 Packet,每个 Packet 大小为 64k 字节,每个 Packet 又由一组 chunk 和这组 chunk 对应的 checksum 数据组成,默认 chunk 大小为 512 字节,每个 checksum 是对 512 字节数据计算的校验和数据。
当 Client 写入的字节流数据达到一个 Packet 的长度,这个 Packet 会被构建出来,然后会被放到队列 dataQueue 中,接着 DataStreamer 线程会不断地从 dataQueue 队列中取出 Packet,发送到复制 Pipeline 中的第一个 DataNode 上,并将该 Packet 从 dataQueue 队列中移到 ackQueue 队列中。ResponseProcessor 线程接收从 DataNode 发送过来的 ack,如果是一个成功的 ack,表示复制 Pipeline 中的所有 DataNode 都已经接收到这个 Packet,ResponseProcessor 线程将 packet 从队列 ackQueue 中删除。
在发送过程中,如果发生错误,所有未完成的 Packet 都会从 ackQueue 队列中移除掉,然后重新创建一个新的 Pipeline,排除掉出错的那些 DataNode 节点,接着 DataStreamer 线程继续从 dataQueue 队列中发送 Packet。
下面是 DFSOutputStream 的结构及其原理,如图所示:
hdfs-write-internal
我们从下面 3 个方面来描述内部流程:

  • 创建 Packet

Client 写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个 Chunk大小(512B)时,便会创建一个 Packet 对象,然后向该 Packet 对象中写 Chunk Checksum 校验和数据,以及实际数据块 Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个 Chunk 大小时,都会向 Packet 中写上述数据内容,直到达到一个 Packet 对象大小(64K),就会将该 Packet 对象放入到 dataQueue 队列中,等待 DataStreamer 线程取出并发送到 DataNode 节点。

  • 发送 Packet

DataStreamer 线程从 dataQueue 队列中取出 Packet 对象,放到 ackQueue 队列中,然后向 DataNode 节点发送这个 Packet 对象所对应的数据。

  • 接收 ack

发送一个 Packet 数据包以后,会有一个用来接收 ack 的 ResponseProcessor 线程,如果收到成功的ack,则表示一个 Packet 发送成功。如果成功,则 ResponseProcessor 线程会将 ackQueue 队列中对应的 Packet 删除。

DFSOutputStream 初始化

首先看一下,DFSOutputStream 的初始化过程,构造方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        boolean createParent, short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
      this(src, blockSize, progress, bytesPerChecksum, replication);
 
      computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默认 writePacketSize=64*1024(即 64K),bytesPerChecksum=512(每 512 个字节计算一个校验和),
 
      try {
        if (createParent) { // createParent 为 true 表示,如果待创建的文件的父级目录不存在,则自动创建
          namenode.create(src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
        }
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.class,
                                       FileNotFoundException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      streamer.start(); // 启动一个 DataStreamer 线程,用来将写入的字节流打包成 packet,然后发送到对应的 DataNode 节点上
    }
上面 computePacketChunkSize() 方法计算了一个 packet 的相关参数,我们结合代码来查看,如下所示:
      int chunkSize = csize + checksum.getChecksumSize();
      int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
      chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
      packetSize = n + chunkSize*chunksPerPacket;

我们用默认的参数值替换上面的参数,得到:

1
2
3
4
int chunkSize = 512 + 4;
int n = 21 + 4;
chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1);  // 127
packetSize = 25 + 516*127;

上面对应的参数,说明如下表所示:

参数名称 参数值 参数含义
chunkSize 512+4=516 每个 chunk 的字节数(数据+校验和)
csize 512 每个 chunk 数据的字节数
psize 64*1024 每个 packet 的最大字节数(不包含 header)
DataNode.PKT_HEADER_LEN 21 每个 packet 的 header 的字节数
chunksPerPacket 127 组成每个 packet 的 chunk 的个数
packetSize 25+516*127=65557 每个 packet 的字节数(一个 header+一组 chunk)

在计算好一个 packet 相关的参数以后,调用 create() 方法与 NameNode 进行 RPC 通信请求,请求创建文件:

1
2
3
4
5
if (createParent) { // createParent 为 true 表示,如果待创建的文件的父级目录不存在,则自动创建
  namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
  namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}

远程调用上面方法,会在 FSNamesystem 中创建对应的文件路径,并初始化与该创建的文件相关的一些信息,如租约(向 DataNode 节点写数据的凭据)。文件在 FSNamesystem 中创建成功,就要初始化并启动一个 DataStreamer 线程,用来向 DataNode 写数据,后面我们详细说明具体处理逻辑。

Packet结构与定义

Client 向 HDFS 写数据,数据会被组装成 Packet,然后发送到 DataNode 节点。Packet 分为两类,一类是实际数据包,另一类是 heatbeat 包。一个 Packet 数据包的组成结构,如图所示:
hdfs-write-packet-structure
上图中,一个 Packet 是由 Header 和 Data 两部分组成,其中 Header 部分包含了一个 Packet 的概要属性信息,如下表所示:

字段名称 字段类型 字段长度 字段含义
pktLen int 4 4 + dataLen + checksumLen
offsetInBlock long 8 Packet 在 Block 中偏移量
seqNo long 8 Packet 序列号,在同一个 Block 唯一
lastPacketInBlock boolean 1 是否是一个 Block 的最后一个 Packet
dataLen int 4 dataPos – dataStart,不包含 Header 和 Checksum 的长度

Data 部分是一个 Packet 的实际数据部分,主要包括一个 4 字节校验和(Checksum)与一个 Chunk 部分,Chunk 部分最大为 512 字节。
在构建一个 Packet 的过程中,首先将字节流数据写入一个 buffer 缓冲区中,也就是从偏移量为 25 的位置(checksumStart)开始写 Packet 数据的 Chunk Checksum 部分,从偏移量为 533 的位置(dataStart)开始写 Packet 数据的 Chunk Data 部分,直到一个 Packet创建完成为止。如果一个 Packet 的大小未能达到最大长度,也就是上图对应的缓冲区中,Chunk Checksum 与 Chunk Data 之间还保留了一段未被写过的缓冲区位置,这种情况说明,已经在写一个文件的最后一个 Block 的最后一个 Packet。在发送这个 Packet 之前,会检查 Chunksum 与 Chunk Data 之间的缓冲区是否为空白缓冲区(gap),如果有则将 Chunk Data 部分向前移动,使得 Chunk Data 1 与 Chunk Checksum N 相邻,然后才会被发送到 DataNode 节点。
我们看一下 Packet 对应的 Packet 类定义,定义了如下一些字段:

1
2
3
4
5
6
7
8
9
10
11
ByteBuffer buffer;           // only one of buf and buffer is non-null
byte[]  buf;
long    seqno;               // sequencenumber of buffer in block
long    offsetInBlock;       // 该packet在block中的偏移量
boolean lastPacketInBlock;   // is this the last packet in block?
int     numChunks;           // number of chunks currently in packet
int     maxChunks;           // 一个packet中包含的chunk的个数
int     dataStart;
int     dataPos;
int     checksumStart;
int     checksumPos;

Packet 类有一个默认的没有参数的构造方法,它是用来做 heatbeat 的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Packet() {
  this.lastPacketInBlock = false;
  this.numChunks = 0;
  this.offsetInBlock = 0;
  this.seqno = HEART_BEAT_SEQNO; // 值为 -1
 
  buffer = null;
  int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
  buf = new byte[packetSize];
 
  checksumStart = dataStart = packetSize;
  checksumPos = checksumStart;
  dataPos = dataStart;
  maxChunks = 0;
}

通过代码可以看到,一个 heatbeat 的内容,实际上只有一个长度为 25 字节的 header 数据。通过 this.seqno = HEART_BEAT_SEQNO; 的值可以判断一个 packet 是否是 heatbeat 包,如果 seqno 为 -1 表示这是一个 heatbeat 包。

Client 发送 Packet 数据

可以 DFSClient 类中看到,发送一个 Packet 之前,首先需要向选定的 DataNode 发送一个 Header 数据包,表明要向 DataNode 写数据,该 Header 的数据结构,如图所示:
hdfs-write-transfer-header
上图显示的是 Client 发送 Packet 到第一个 DataNode 节点的 Header 数据结构,主要包括待发送的 Packet 所在的 Block(先向 NameNode 分配 Block ID 等信息)的相关信息、Pipeline 中另外 2 个 DataNode 的信息、访问令牌(Access Token)和校验和信息,Header 中各个字段及其类型,详见下表:

字段名称 字段类型 字段长度 字段含义
Transfer Version short 2 Client 与 DataNode 之间数据传输版本号,由常量 DataTransferProtocol.DATA_TRANSFER_VERSION 定义,值为 17
OP int 4 操作类型,由常量 DataTransferProtocol.OP_WRITE_BLOCK 定义,值为 80
blkId long 8 Block 的 ID 值,由 NameNode 分配
GS long 8 时间戳(Generation Stamp),NameNode 分配 blkId 的时候生成的时间戳
DNCnt int 4 DataNode 复制 Pipeline中DataNode 节点的数量
Recovery Flag boolean 1 Recover 标志
Client Text Client 主机的名称,在使用 Text 进行序列化的时候,实际包含长度 len 与主机名称字符串 ClientHost
srcNode boolean 1 是否发送 src node 的信息,默认值为 false,不发送 src node 的信息
nonSrcDNCnt int 4 由 Client 写的该 Header 数据,该数不包含 Pipeline 中第一个节点(即为 DNCnt-1)
DN2 DatanodeInfo DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
DN3 DatanodeInfo DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
Access Token Token 访问令牌信息,包括 IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service
CheckSum Header DataChecksum 1+4 校验和 Header 信息,包括 type、bytesPerChecksum

Header 数据包发送成功,Client 会收到一个成功响应码(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着将 Packet 数据发送到 Pipeline 中第一个 DataNode 上,如下所示:

1
2
3
4
5
6
7
8
9
Packet one = null;
one = dataQueue.getFirst(); // regular data packet
ByteBuffer buf = one.getBuffer();
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
 
if (one.lastPacketInBlock) { // 如果是 Block 中的最后一个 Packet,还要写入一个 0 标识该 Block 已经写入完成
    blockStream.writeInt(0); // indicate end-of-block
}

否则,如果失败,则会与 NameNode 进行RPC调用,删除该 Block,并把该 Pipeline 中第一个 DataNode 加入到 excludedNodes 列表中,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
if (!success) {
  LOG.info("Abandoning " + block);
  namenode.abandonBlock(block, src, clientName);
 
  if (errorIndex < nodes.length) {
    LOG.info("Excluding datanode " + nodes[errorIndex]);
    excludedNodes.add(nodes[errorIndex]);
  }
 
  // Connection failed.  Let's wait a little bit and retry
  retry = true;
}

DataNode 端服务组件

数据最终会发送到 DataNode 节点上,在一个 DataNode 上,数据在各个组件之间流动,流程如下图所示:
hdfs-write-pipeline-single
DataNode 服务中创建一个后台线程 DataXceiverServer,它是一个 SocketServer,用来接收来自 Client(或者 DataNode Pipeline 中的非最后一个 DataNode 节点)的写数据请求,然后在 DataXceiverServer 中将连接过来的 Socket 直接派发给一个独立的后台线程 DataXceiver 进行处理。所以,Client 写数据时连接一个 DataNode Pipeline 的结构,实际流程如图所示:
hdfs-write-pipeline-datanodes
每个 DataNode 服务中的 DataXceiver 后台线程接收到来自前一个节点(Client/DataNode)的 Socket 连接,首先读取 Header 数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
boolean hasSrcDataNode = in.readBoolean(); // is src node info present
if (hasSrcDataNode) {
  srcDataNode = new DatanodeInfo();
  srcDataNode.readFields(in);
}
int numTargets = in.readInt();
if (numTargets < 0) {
  throw new IOException("Mislabelled incoming datastream.");
}
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i < targets.length; i++) {
  DatanodeInfo tmp = new DatanodeInfo();
  tmp.readFields(in);
  targets[i] = tmp;
}
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);

上面代码中,读取 Header 的数据,与前一个 Client/DataNode 写入 Header 字段的顺序相对应,不再累述。在完成读取 Header 数据后,当前 DataNode 会首先将 Header 数据再发送到 Pipeline 中下一个 DataNode 节点,当然该 DataNode 肯定不是 Pipeline 中最后一个 DataNode 节点。接着,该 DataNode 会接收来自前一个 Client/DataNode 节点发送的 Packet 数据,接收 Packet 数据的逻辑实际上在 BlockReceiver 中完成,包括将来自前一个 Client/DataNode 节点发送的 Packet 数据写入本地磁盘。在 BlockReceiver 中,首先会将接收到的 Packet 数据发送写入到 Pipeline 中下一个 DataNode 节点,然后再将接收到的数据写入到本地磁盘的 Block 文件中。

DataNode 持久化 Packet 数据

在 DataNode 节点的 BlockReceiver 中进行 Packet 数据的持久化,一个 Packet 是一个 Block 中一个数据分组,我们首先看一下,一个 Block 在持久化到磁盘上的物理存储结构,如下图所示:
hdfs-write-block-physical
每个 Block 文件(如上图中 blk_1084013198 文件)都对应一个 meta 文件(如上图中 blk_1084013198_10273532.meta 文件),Block 文件是一个一个 Chunk 的二进制数据(每个 Chunk 的大小是 512 字节),而 meta 文件是与每一个 Chunk 对应的 Checksum 数据,是序列化形式存储。

写文件过程中 Client/DataNode 与 NameNode 进行 RPC 调用

Client 在 HDFS 文件系统中写文件过程中,会发生多次与 NameNode 节点进行 RPC 调用来完成写数据相关操作,主要是在如下时机进行 RPC 调用:

  • 写文件开始时创建文件:Client 调用 create 在 NameNode 节点的 Namespace 中创建一个标识该文件的条目
  • 在 Client 连接 Pipeline 中第一个 DataNode 节点之前,Client 调用 addBlock 分配一个Block(blkId+DataNode 列表+租约)
  • 如果与 Pipeline 中第一个 DataNode 节点连接失败,Client 调用abandonBlock 放弃一个已经分配的 Block
  • 一个 Block 已经写入到 DataNode 节点磁盘,Client 调用 fsync 让 NameNode 持久化 Block 的位置信息数据
  • 文件写完以后,Client 调用 complete 方法通知 NameNode 写入文件成功
  • DataNode 节点接收到并成功持久化一个 Block 的数据后,DataNode 调用 blockReceived 方法通知 NameNode 已经接收到 Block

具体 RPC 调用的详细过程,可以参考源码。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(26): “HDFS 写文件过程分析

  1. 非常受教!不过楼主我两个问题想请教一下:1)看过很多资料都说各个数据节点都是pipeline的,但是假设副本数为3,数据节点为10,那样的话并不是每一个节点都存有同一个块的数据的对吧?如果是这样的话按照这种水龙头一样的ack确认是怎样的布局的呢?或者说各个数据节点真的是一个接连着一个的么?2)一个文件传到集群的时候,是首先文件的全部数据块传到某一个设为primary的数据节点,然后再决定把副本发送给另外哪些节点的么?
    期待您的回答!
    ps:读了您的几篇文章,非常有帮助!希望楼主坚持写下去!

    • 明白下面几点,你就应该知道是怎么回事了:
      1、数据在HDFS上存储的基本单位是Block,默认大小64M;
      2、数据在Client和DataNode之间传输数据的基本单位是Packet,默认最大为65557B;
      3、数据传输Pipeline,宏观上看是Block Pipeline,但是微观上其实是Packet Pipeline。

      • 谢谢楼主,我觉得您的回答解决了我第一个问题,也就是针对每一个block,namenode给出了可以写入的几个datanode的列表,这几个datanode形成传输pipeline,以packet为单位传输数据,是么?
        但是第二个问题我还有点疑问,我对您的回答的理解是,在传输数据的时候是对每个块都选择一个作为primary的数据节点,而不是对一个文件来选择一个作为primary的数据节点,是么?

        • 肯定是以Block为单位,选择DataNode列表去存储数据。
          在向HDFS写文件的时候,对于每一个Block(DFSClient会计算什么时候够一个Block大小),DFSClient都需要向NameNode进行一次RCP通信,申请分配(allocate)Block,而NameNode会返回一个可用的DataNode列表,DFSClient将该Block写入到这个DataNode列表中节点上。
          建议读读源码。

  2. 你好,看了几篇hadoop写的文章之后, 看看我的理解是否有些偏差。

    1. Client在向hadoop写数据, 把数据封装成packet放入到dataQueue队列中,通过DataStreamer线程进行发送,假设副本为3
    数据发送到DataNode-1中,然后由DataNode-1将数据传递到DataNode-2,DataNode-3。等待响应线程处理,再次逆流而上
    DataNode-3响应 -> DataNode-2,最终DataNode-1响应回去客户端。

    上边其实就是备份一些操作,假设10就往10个节点写。当每次写完一个blockSize大小的时候就会清除(这里有段代码控制,有点忘记了)。

    在写数据的时候,其实是往本地写。

    • 嗯,大体逻辑差不多是这样的。后面你说“清除”,应该是每写一个block之前会向NameNode发送RPC请求申请分配block,写完之后如果还有数据,再次申请下一个block。

  3. 请问“基于Socket连接发送到按特定算法选择的HDFS集群中一组DataNode”这个具体实现在哪一部分代码里?

    • RPC调用NameNode的addBlock方法时,使用BlockPlacementPolicy(默认实现BlockPlacementPolicyDefault)来选择(方法chooseTarget)待分配的block放置在哪些节点上,可以查看源码。

  4. 楼主你好,请教几个问题。
    1:如果默认block是64m,但文件很小比如10m。那么存储这个文件的block是只有10m,还是可以继续用别的小文件填满64m?
    2:如果1是只有10m,是不是如果我的文件都小于blocksize,那么实际的block大小就都小于blocksize,这个值就是一个block允许的最大值?
    3:如果1可以继续填满,能简单阐述下原理吗?
    4:如果我的文件远小于64m,那么会对磁盘寻址效率有怎样影响?或者说对集群效率有怎样负面影响?除了namenode维护的block列表多了以外~
    问题问的比较肤浅~
    麻烦楼主方便的时候解答下~
    多谢~

    • 一个文件小于blocksize,那就一个文件一个block,不会追加。
      进行MR计算的时候,每个split要启动一个Map,如果小文件太多,启动Map的开销可能比实际计算这个split的开销还要大。

      • 感谢耐心解答~
        追加个问题~
        hdfs的一个block的数据在磁盘上是连续存储的吗?
        如果不是,连续存储的单位是os的block吗?

        • 是连续的,你可以看一下磁盘存储目录下面类似blk_1082986999这样名字的文件,这就是一个block的数据

  5. 您好,想请教您一个问题:block的地址能否获取,就是能用用IO流处理额那种地址。我实验了split.getpath 和 getlocation都不是我想要的地址。谢谢!

      • 谢谢回复。getFileBlockLocations方法好像只能查到文件所有的block处在那个哪个datanode上。我想如果能在程序里得到当前block的ID,加上我自定义的hadoop/data/ 目录,就能组成一个路径。怎么得到当前的blockId呢,我不太会写啊,谢谢您~

        • 我经验好像是会生成在配置文件中配置的路径下,比如/tmp/../blockId,就是实际上这个路径是你自己配置好了的,但是这个blockID应该不是那么容易得到的吧,或者说不一定是你想要的那个块的

  6. 赞博主~

    再请教博主一个问题,我在往hdfs里写文件的时候发现这样一个现象:

    当写文件的程序运行完成的时候,系统监控里依然能看到在写磁盘的操作,再过一会才会停止。此时程序已经执行完成,在hdfs里也能看到写入的文件,但写磁盘的操作仍在进行中。

    这个现象总是出现的。

    我猜测是虽然程序已经执行完成,在hdfs里也能看到写入的文件,但是还是有block的备份未被写完。
    不知我的猜测是否成立呢

  7. 你好,我最近在做一个hdfs读写性能测试的实验,包括降级读的过程。Hadoop自带的测试工具无法测试出降级读的吞吐率,所以需要人工分析。
    对于hdfs的文件写入过程,可以理解为从调用filterFileSystem类中的create方法开始,到调用该类中的close方法结束吗?
    此外,对于hdfs的文件读取过程,可以理解为从调用DFSClient类中的open方法开始,到调用该类的close方法结束吗?

    • 调用create方法是创建一个文件,并返回一个stream,调用stream的write方法开始写入文件。调用close,完成写文件。
      读的过程也是类似的。

      • 好的,谢谢博主,我之前有点没说清楚。
        我主要是在做一个Hadoop读写性能的对比试验。具体来说,主要是想对比Hadoop0.20.203原始版本跟Facebook二次开发的hdfs raid版本的读写性能差异。在《Hadoop 权威指南》一书上提到,对于hdfs的文件写入过程,客户端是调用DistributedFileSystem类中的create方法开始。在跟同学讨论时,同学说在hdfs raid那个版本中,对于hdfs的文件写入过程,是从调filterFileSystem类中的create方法开始,我也不确定,所以想向博主求证一下。
        在明确了读写流程后,才可以修改下代码,加入一个时间的统计。

  8. 博主你好,我有一个疑问就是,假设客户端为非集群上的机器,然后向集群写入数据的时候,在Pipeline中,上游的datanode在接收到每个packet的时候,是会等到管道中最后一个datanode验证了这个数据包的检验和,并且返回的ack表示正确的时候将数据写入到磁盘呢。还是怎么的???

  9. 博主你好,我在做一个zipOutputStream写入hdfs的流式写入,将FSDataOutputStream包在zipOutputStream中,想中间加缓冲流来着但是感觉FSDataOutputSream中有缓冲区,因此是不是可以不用加这个缓冲流?io.file.buffer.size和dfs.client-write-packet-size这两个参数,第一个是不是你说的内部缓冲区的大小?如果要加快写入的话参数应该怎样抉择呢。。

Yanjun进行回复 取消回复

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>