HDFS源码分析心跳汇报之数据块增量汇报

《HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程》一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:

1、与NameNode握手:

1.1、第一阶段:获取命名空间信息并验证、设置;

1.2、第二阶段:DataNode注册;

2、周期性调用sendHeartBeat()方法发送心跳信息,并处理来自心跳响应中的命令;

3、调用reportReceivedDeletedBlocks()方法发送数据库增量汇报:包括正在接收的、已接收的和已删除的数据块;

4、调用blockReport()方法周期性进行数据块汇报,并处理返回的相关命令。

本文,我们重点讲解下其中的第三步:调用reportReceivedDeletedBlocks()方法发送数据库增量汇报:包括正在接收的、已接收的和已删除的数据块。

首先,这个数据块增量汇报是什么情况下发生的呢?在DataNode与NameNode握手并注册后实现心跳的offerService()方法的while循环内,有这么一段代码,如下:

  1. // 如果标志位sendImmediateIBR为true,或者数据块增量汇报时间已到,
  2. // 数据块增量汇报时间间隔是心跳时间间隔的100倍,默认情况下是5分钟
  3. if (sendImmediateIBR ||
  4. (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
  5. // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报
  6. reportReceivedDeletedBlocks();
  7. // 设置上次数据块增量汇报时间lastDeletedReport为startTime
  8. lastDeletedReport = startTime;
  9. }

首先,这个sendImmediateIBR是一个标志位,它标识着是否立即发送一个数据块增量汇报,在BPServiceActor工作线程初始化时默认为false。而数据块增量汇报是否发送,这里有两个条件,只要满足其中一个即可发送数据块增量汇报:

1、是否立即发送一个数据块增量汇报的标志位sendImmediateIBR为true;

2、数据块增量汇报的时间间隔已到:数据块增量汇报的时间间隔是心跳时间间隔的100倍,默认情况下是5分钟。

在讲解reportReceivedDeletedBlocks()方法前,我们先看BPServiceActor工作线程的一个成员变量,定义如下:

  1. /**
  2. * Between block reports (which happen on the order of once an hour) the
  3. * DN reports smaller incremental changes to its block list. This map,
  4. * keyed by block ID, contains the pending changes which have yet to be
  5. * reported to the NN. Access should be synchronized on this object.
  6. *
  7. * 在数据块汇报(通常一小时一次)之间,DataNode会汇报其数据块列表的增量变化情况。
  8. * 这个Map,包含尚未汇报给NameNode的DataNode上数据块正在发生的变化。
  9. * 访问它必须使用synchronized关键字。
  10. */
  11. private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
  12. pendingIncrementalBRperStorage = Maps.newHashMap();

先说下这个pendingIncrementalBRperStorage变量对应的数据结构,它是一个Map,key为DatanodeStorage类型,value为PerStoragePendingIncrementalBR类型。而这个PerStoragePendingIncrementalBR类型在其内部封装了一个叫做pendingIncrementalBR的HashMap,key为blockId,value为ReceivedDeletedBlockInfo,ReceivedDeletedBlockInfo对Block做了一层封装了,它标识了对应Block在DataNode上的状态BlockStatus,BlockStatus是一个枚举类,包含的Block状态分别有正在接收的数据块RECEIVING_BLOCK(1)、已经接收的数据块RECEIVED_BLOCK(2)、已被删除的数据块DELETED_BLOCK(3)三种状态。

也就是说,pendingIncrementalBRperStorage实际上存储了DataNode上每个DatanodeStorage到对应的增量数据块集合的映射关系,而这个增量数据块,包含正在接收的、已接受的和已删除的。

在数据块汇报(通常一小时一次)之间,DataNode会汇报其数据块列表的增量变化情况,这个是作为一个小的(smaller)汇报进行的。这个Map,包含尚未汇报给NameNode的DataNode上数据块正在发生的变化,访问它必须使用synchronized关键字。而这个数据块增量汇报,其主要目的就应该是尽早让名字节点NameNode了解数据节点DataNode上数据块的变化情况,而不是通过正常的每小时一次的数据块汇报来告知名字节点,那样的话对于整个文件系统来说,是很被动的一见事。

好了,我们再看下reportReceivedDeletedBlocks()方法,它是完成数据块增量汇报的核心方法,代码如下:

  1. /**
  2. * Report received blocks and delete hints to the Namenode for each
  3. * storage.
  4. *
  5. * @throws IOException
  6. */
  7. private void reportReceivedDeletedBlocks() throws IOException {
  8. // Generate a list of the pending reports for each storage under the lock
  9. // 创建一个存储StorageReceivedDeletedBlocks的ArrayList列表reports,
  10. // 大小为pendingIncrementalBRperStorage的大小
  11. // StorageReceivedDeletedBlocks是对DatanodeStorage和ReceivedDeletedBlockInfo数组的一个封装,
  12. // 实际上就是将pendingIncrementalBRperStorage由Map转换为List列表形式
  13. ArrayList<StorageReceivedDeletedBlocks> reports =
  14. new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
  15. // 使用synchronized对pendingIncrementalBRperStorage进行同步:
  16. synchronized (pendingIncrementalBRperStorage) {
  17. // 遍历pendingIncrementalBRperStorage
  18. for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
  19. pendingIncrementalBRperStorage.entrySet()) {
  20. // 取出每个DatanodeStorage、PerStoragePendingIncrementalBR进行处理
  21. final DatanodeStorage storage = entry.getKey();
  22. final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
  23. // 如果perStorageMap中存在发生变化的数据块:
  24. if (perStorageMap.getBlockInfoCount() > 0) {
  25. // Send newly-received and deleted blockids to namenode
  26. // 发送新接收的或者已删除的数据块ID给NameNode
  27. // 从perStorageMap中获得ReceivedDeletedBlockInfo数组
  28. ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
  29. // 将根据DatanodeStorage和ReceivedDeletedBlockInfo数组构造的StorageReceivedDeletedBlocks加入reports列表
  30. reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
  31. }
  32. }
  33. // 立即汇报的标志位sendImmediateIBR设置为false
  34. sendImmediateIBR = false;
  35. }
  36. if (reports.size() == 0) {// reports大小为0的话,直接返回null
  37. // Nothing new to report.
  38. return;
  39. }
  40. // Send incremental block reports to the Namenode outside the lock
  41. // 发送是否成功的标志位success初始化为false
  42. boolean success = false;
  43. try {
  44. // 通过NameNode代理的blockReceivedAndDeleted()方法,将新接收的或者已删除的数据块汇报给NameNode,汇报的信息包括:
  45. // 1、数据节点注册信息DatanodeRegistration;
  46. // 2、数据块池ID;
  47. // 3、需要汇报的数据块及其状态信息列表StorageReceivedDeletedBlocks;
  48. bpNamenode.blockReceivedAndDeleted(bpRegistration,
  49. bpos.getBlockPoolId(),
  50. reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
  51. // 发送是否成功的标志位success设置为true
  52. success = true;
  53. } finally {
  54. if (!success) {// 汇报不成功的话
  55. synchronized (pendingIncrementalBRperStorage) {
  56. for (StorageReceivedDeletedBlocks report : reports) {
  57. // If we didn't succeed in sending the report, put all of the
  58. // blocks back onto our queue, but only in the case where we
  59. // didn't put something newer in the meantime.
  60. // 将数据块再放回到perStorageMap
  61. PerStoragePendingIncrementalBR perStorageMap =
  62. pendingIncrementalBRperStorage.get(report.getStorage());
  63. perStorageMap.putMissingBlockInfos(report.getBlocks());
  64. // 立即汇报的标志位sendImmediateIBR设置为true
  65. sendImmediateIBR = true;
  66. }
  67. }
  68. }
  69. }
  70. }

这个reportReceivedDeletedBlocks()方法的大致处理流程如下:

1、创建一个存储StorageReceivedDeletedBlocks的ArrayList列表reports:

大小为pendingIncrementalBRperStorage的大小。StorageReceivedDeletedBlocks是对DatanodeStorage和ReceivedDeletedBlockInfo数组的一个封装,实际上就是将pendingIncrementalBRperStorage由Map转换为List列表形式;

2、使用synchronized对pendingIncrementalBRperStorage进行同步,遍历pendingIncrementalBRperStorage:

2.1、取出每个DatanodeStorage、PerStoragePendingIncrementalBR进行处理;

2.2、如果perStorageMap中存在发生变化的数据块,发送新接收的或者已删除的数据块ID给NameNode:

2.2.1、从perStorageMap中获得ReceivedDeletedBlockInfo数组;

2.2.3、将根据DatanodeStorage和ReceivedDeletedBlockInfo数组构造的StorageReceivedDeletedBlocks加入reports列表;

3、立即汇报的标志位sendImmediateIBR设置为false;

4、reports大小为0的话,直接返回null;

5、发送是否成功的标志位success初始化为false;

6、通过NameNode代理bpNamenode的blockReceivedAndDeleted()方法,将新接收的或者已删除的数据块汇报给NameNode,汇报的信息包括:

6.1、数据节点注册信息DatanodeRegistration;

6.2、数据块池ID;

6.3、需要汇报的数据块及其状态信息列表StorageReceivedDeletedBlocks;

7、 发送是否成功的标志位success设置为true;

8、汇报不成功的话,遍历reports:

8.1、将数据块再放回到perStorageMap;

8.2、立即汇报的标志位sendImmediateIBR设置为true。

针对上述流程,我们先说下是否应立即汇报增量数据块信息的标志位sendImmediateIBR。当BPServiceActor工作线程创建时,这个标志位默认为false,即不会立即发送数据块增量汇报,而是周期性的到期才会发送。而当该发送数据块增量汇报时,无论标志位之前为true还是false,统一设置为false,因为此时数据块增量汇报已经发送了,下次没必要再立即发送了。而只有当数据块增量汇报不成功时,该标志位才会被设置为true,以便下次循环直接发送之前未成功的数据块增量汇报,而不用管数据块增量汇报的时间间隔是否到期。这个标志位就是为了在数据块增量汇报失败的情况下,下次循环中能立即发送出去,以便让NameNode及时了解DataNode数据块情况。

那么,数据块增量汇报是如何发送给NameNode的呢?我们先看下NameNode在DataNode上的代理bpNamenode,它的定义如下:

  1. DatanodeProtocolClientSideTranslatorPB bpNamenode;

它是BPServiceActor线程中一个DatanodeProtocolClientSideTranslatorPB类型的变量,也就意味着每个与NameNode通讯的BPServiceActor工作线程,都持有一个NameNode的代理,其初始化是在BPServiceActor工作线程与NameNode连接时完成的,我们看下DatanodeProtocolClientSideTranslatorPB类中完成数据块增量汇报的blockReceivedAndDeleted()方法,代码如下:

  1. @Override
  2. public void blockReceivedAndDeleted(DatanodeRegistration registration,
  3. String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
  4. throws IOException {
  5. BlockReceivedAndDeletedRequestProto.Builder builder =
  6. BlockReceivedAndDeletedRequestProto.newBuilder()
  7. .setRegistration(PBHelper.convert(registration))
  8. .setBlockPoolId(poolId);
  9. for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
  10. StorageReceivedDeletedBlocksProto.Builder repBuilder =
  11. StorageReceivedDeletedBlocksProto.newBuilder();
  12. repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID());  // Set for wire compatibility.
  13. repBuilder.setStorage(PBHelper.convert(storageBlock.getStorage()));
  14. for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
  15. repBuilder.addBlocks(PBHelper.convert(rdBlock));
  16. }
  17. builder.addBlocks(repBuilder.build());
  18. }
  19. try {
  20. // 通过实现了DatanodeProtocolPB接口的blockReceivedAndDeleted()方法发送的
  21. // rpcProxy最终加载的是参数rpc.engine.DatanodeProtocolPB配置的类
  22. rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
  23. } catch (ServiceException se) {
  24. throw ProtobufHelper.getRemoteException(se);
  25. }
  26. }

而rpcProxy最终加载的是参数rpc.engine.DatanodeProtocolPB配置的类,实际上也就是DatanodeProtocolServerSideTranslatorPB类,由它负责向NamNode发送RPC请求,而NameNode对应RPC请求处理的方法在NameNodeRpcServer类中的blockReceivedAndDeleted()方法,代码如下:

  1. @Override // DatanodeProtocol
  2. public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
  3. StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
  4. verifyRequest(nodeReg);
  5. metrics.incrBlockReceivedAndDeletedOps();
  6. if(blockStateChangeLog.isDebugEnabled()) {
  7. blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
  8. +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
  9. +" blocks.");
  10. }
  11. for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
  12. // 最终遍历StorageReceivedDeletedBlocks数组,针对每个StorageReceivedDeletedBlocks,
  13. // 调用FSNamesystem的processIncrementalBlockReport()方法进行处理
  14. namesystem.processIncrementalBlockReport(nodeReg, r);
  15. }
  16. }

最终遍历StorageReceivedDeletedBlocks数组,针对每个StorageReceivedDeletedBlocks,调用FSNamesystem的processIncrementalBlockReport()方法进行处理。ok,继续追踪,如下:

  1. public void processIncrementalBlockReport(final DatanodeID nodeID,
  2. final StorageReceivedDeletedBlocks srdb)
  3. throws IOException {
  4. <span style="white-space:pre">    </span>
  5. <span style="white-space:pre">    </span>// 典型的写锁模式
  6. <span style="white-space:pre">    </span>
  7. <span style="white-space:pre">    </span>// 获取写锁
  8. writeLock();
  9. try {
  10. <span style="white-space:pre">  </span>
  11. // 调用BlockManager的processIncrementalBlockReport()方法处理数据块增量汇报
  12. blockManager.processIncrementalBlockReport(nodeID, srdb);
  13. } finally {
  14. <span style="white-space:pre">  </span>
  15. // 释放写锁
  16. writeUnlock();
  17. }
  18. }

FSNamesystem的processIncrementalBlockReport()方法是典型的一个读写锁中写锁模式,获取写锁,try模块中处理业务逻辑,finally模块中释放写锁。而业务逻辑的处理,则是通过调用BlockManager的processIncrementalBlockReport()方法来完成的。FSNamesystem相当于名字节点NameNod门面模式中的门面,由它负责一切文件系统操作相关的处理。而BlockManager则是名字节点NameNode中针对所有block状态保持、变更处理等的大管家,我们会在后续文章后陆续介绍这两个重要的变量。

好吧,我们先看下BlockManager的processIncrementalBlockReport()方法,代码如下:

  1. /**
  2. * The given node is reporting incremental information about some blocks.
  3. * This includes blocks that are starting to be received, completed being
  4. * received, or deleted.
  5. *
  6. * This method must be called with FSNamesystem lock held.
  7. */
  8. public void processIncrementalBlockReport(final DatanodeID nodeID,
  9. final StorageReceivedDeletedBlocks srdb) throws IOException {
  10. assert namesystem.hasWriteLock();
  11. int received = 0;
  12. int deleted = 0;
  13. int receiving = 0;
  14. final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
  15. if (node == null || !node.isAlive) {
  16. blockLog
  17. .warn("BLOCK* processIncrementalBlockReport"
  18. + " is received from dead or unregistered node "
  19. + nodeID);
  20. throw new IOException(
  21. "Got incremental block report from unregistered or dead node");
  22. }
  23. DatanodeStorageInfo storageInfo =
  24. node.getStorageInfo(srdb.getStorage().getStorageID());
  25. if (storageInfo == null) {
  26. // The DataNode is reporting an unknown storage. Usually the NN learns
  27. // about new storages from heartbeats but during NN restart we may
  28. // receive a block report or incremental report before the heartbeat.
  29. // We must handle this for protocol compatibility. This issue was
  30. // uncovered by HDFS-6094.
  31. storageInfo = node.updateStorage(srdb.getStorage());
  32. }
  33. // 取出每个ReceivedDeletedBlockInfo进行处理
  34. for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
  35. switch (rdbi.getStatus()) {
  36. case DELETED_BLOCK:// 如果是已被删除的数据块
  37. // 调用removeStoredBlock()方法在NameNode中移除node对应数据块元信息
  38. removeStoredBlock(rdbi.getBlock(), node);
  39. // 计数器deleted加1
  40. deleted++;
  41. break;
  42. case RECEIVED_BLOCK:// 如果是已接收的数据块
  43. // 调用addBlock()方法在NameNode中添加数据块元信息
  44. addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
  45. // 计数器received加1
  46. received++;
  47. break;
  48. case RECEIVING_BLOCK:// 如果是正在接收的数据块
  49. // 计数器receiving加1
  50. receiving++;
  51. // 调用processAndHandleReportedBlock()方法在NameNode中处理正在接收的数据块
  52. processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
  53. ReplicaState.RBW, null);
  54. break;
  55. default:
  56. String msg =
  57. "Unknown block status code reported by " + nodeID +
  58. ": " + rdbi;
  59. blockLog.warn(msg);
  60. assert false : msg; // if assertions are enabled, throw.
  61. break;
  62. }
  63. if (blockLog.isDebugEnabled()) {
  64. blockLog.debug("BLOCK* block "
  65. + (rdbi.getStatus()) + ": " + rdbi.getBlock()
  66. + " is received from " + nodeID);
  67. }
  68. }
  69. blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
  70. + nodeID + " receiving: " + receiving + ", " + " received: " + received
  71. + ", " + " deleted: " + deleted);
  72. }

整个逻辑非常清晰,取出每个ReceivedDeletedBlockInfo进行处理:

1、如果是已被删除的数据块:

1.1、调用removeStoredBlock()方法在NameNode中移除node对应数据块元信息;

1.2、计数器deleted加1;

2、如果是已接收的数据块:

2.1、调用addBlock()方法在NameNode中添加数据块元信息;

2.2、计数器received加1;

3、如果是正在接收的数据块:

3.1、计数器receiving加1;

3.2、调用processAndHandleReportedBlock()方法在NameNode中处理正在接收的数据块。

至于NameNode的BlockManager到底是何如处理的,我们留到以后分析NameNode和BlockManager时再做详细分析吧!

这里做个简单总结:

数据块增量汇报是负责向NameNode发送心跳信息工作线程BPServiceActor中周期性的一个工作,它负责向NameNode及时汇报DataNode节点上数据块的变化情况,比如数据块正在接收、已接收或者已被删除。它的工作周期要小于正常的数据块汇报,目的就是为了能够让NameNode及时掌握DataNode上数据块变化情况,以便HDFS系统运行正常,略显机智!而且,当数据块增量汇报不成功时,下一个循环会接着立即发送数据块增量汇报,而不是等其下一个周期的到来,这显示了HDFS良好的容错性,是一个值得我们借鉴的设计方法。

上一篇:Java多线程——ThreadLocal类


下一篇:HDFS源码分析EditLog之获取编辑日志输入流