HDFS源码分析心跳汇报之数据块汇报

在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报。

首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法:

  1. /**
  2. * Main loop for each BP thread. Run until shutdown,
  3. * forever calling remote NameNode functions.
  4. */
  5. private void offerService() throws Exception {
  6. //
  7. // Now loop for a long time....
  8. //
  9. while (shouldRun()) {// 又是一个利用shouldRun()判断的while循环
  10. try {
  11. // 省略部分代码
  12. ...
  13. // 调用blockReport()方法,进行数据块汇报,放返回来自名字节点NameNode的相关命令cmds
  14. List<DatanodeCommand> cmds = blockReport();
  15. // 调用processCommand()方法处理来自名字节点NameNode的相关命令cmds
  16. processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
  17. // 省略部分代码
  18. //
  19. // There is no work to do;  sleep until hearbeat timer elapses,
  20. // or work arrives, and then iterate again.
  21. // 计算等待时间waitTime:心跳时间间隔减去上次心跳后截至到现在已过去的时间
  22. long waitTime = dnConf.heartBeatInterval -
  23. (Time.now() - lastHeartbeat);
  24. synchronized(pendingIncrementalBRperStorage) {
  25. if (waitTime > 0 && !sendImmediateIBR) {// 如果等待时间大于0,且不是立即发送数据块增量汇报
  26. try {
  27. // 利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步
  28. pendingIncrementalBRperStorage.wait(waitTime);
  29. } catch (InterruptedException ie) {
  30. LOG.warn("BPOfferService for " + this + " interrupted");
  31. }
  32. }
  33. } // synchronized
  34. } catch(RemoteException re) {
  35. <pre name="code" class="java">       // 省略部分代码

} catch (IOException e) {

  1. // 省略部分代码

} } // while (shouldRun())


可以看出,在BPServiceActor工作线程offerService()方法的while循环内,数据块汇报blockReport()方法执行时,仅有下面的waitTime的等待时间,其他情况下都是立即执行的。那么等待时间waitTime是如何计算的呢?它就是心跳时间间隔减去上次心跳后截至到现在已过去的时间,并且,如果等待时间waitTime大于0,且不是立即发送数据块增量汇报(标志位sendImmediateIBR为false),那么才会利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步。在这里,我们就可以大胆猜测,数据块汇报的时间间隔应该是大于心跳时间间隔的,并且两者之间的距离肯定不小。

那么,我们开始研究实现正常数据块汇报的blockReport()方法吧,代码如下:

  1. /**
  2. * Report the list blocks to the Namenode
  3. * @return DatanodeCommands returned by the NN. May be null.
  4. * @throws IOException
  5. */
  6. List<DatanodeCommand> blockReport() throws IOException {
  7. // send block report if timer has expired.
  8. // 到期就发送数据块汇报
  9. // 取当前开始时间startTime
  10. final long startTime = now();
  11. // 如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null,
  12. // 数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时
  13. if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
  14. return null;
  15. }
  16. // 构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand
  17. ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
  18. // Flush any block information that precedes the block report. Otherwise
  19. // we have a chance that we will miss the delHint information
  20. // or we will report an RBW replica after the BlockReport already reports
  21. // a FINALIZED one.
  22. // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报
  23. reportReceivedDeletedBlocks();
  24. // 记录上次数据块增量汇报时间lastDeletedReport
  25. lastDeletedReport = startTime;
  26. // 设置数据块汇报起始时间brCreateStartTime为当前时间
  27. long brCreateStartTime = now();
  28. // 从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists,
  29. // key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs
  30. Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
  31. dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
  32. // Convert the reports to the format expected by the NN.
  33. int i = 0;
  34. int totalBlockCount = 0;
  35. // 创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小
  36. StorageBlockReport reports[] =
  37. new StorageBlockReport[perVolumeBlockLists.size()];
  38. // 遍历perVolumeBlockLists
  39. for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
  40. // 取出value:BlockListAsLongs
  41. BlockListAsLongs blockList = kvPair.getValue();
  42. // 将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,
  43. // StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组
  44. reports[i++] = new StorageBlockReport(
  45. kvPair.getKey(), blockList.getBlockListAsLongs());
  46. // 累加数据块数目totalBlockCount
  47. totalBlockCount += blockList.getNumberOfBlocks();
  48. }
  49. // Send the reports to the NN.
  50. int numReportsSent;
  51. long brSendStartTime = now();
  52. // 根据数据块总数目判断是否需要多次发送消息
  53. if (totalBlockCount < dnConf.blockReportSplitThreshold) {// 如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送
  54. // split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000
  55. // Below split threshold, send all reports in a single message.
  56. // 发送的数据块汇报消息数numReportsSent设置为1
  57. numReportsSent = 1;
  58. // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息
  59. DatanodeCommand cmd =
  60. bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
  61. // 将数据块汇报后返回的命令cmd加入到命令列表cmds
  62. if (cmd != null) {
  63. cmds.add(cmd);
  64. }
  65. } else {
  66. // Send one block report per message.
  67. // 发送的数据块汇报消息数numReportsSent设置为1
  68. numReportsSent = i;
  69. // 遍历reports,取出每个StorageBlockReport
  70. for (StorageBlockReport report : reports) {
  71. StorageBlockReport singleReport[] = { report };
  72. // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息
  73. DatanodeCommand cmd = bpNamenode.blockReport(
  74. bpRegistration, bpos.getBlockPoolId(), singleReport);
  75. // 将数据块汇报后返回的命令cmd加入到命令列表cmds
  76. if (cmd != null) {
  77. cmds.add(cmd);
  78. }
  79. }
  80. }
  81. // Log the block report processing stats from Datanode perspective
  82. // 计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中
  83. long brSendCost = now() - brSendStartTime;
  84. long brCreateCost = brSendStartTime - brCreateStartTime;
  85. dn.getMetrics().addBlockReport(brSendCost);
  86. LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
  87. " blocks total. Took " + brCreateCost +
  88. " msec to generate and " + brSendCost +
  89. " msecs for RPC and NN processing. " +
  90. " Got back commands " +
  91. (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
  92. // 调用scheduleNextBlockReport()方法,调度下一次数据块汇报
  93. scheduleNextBlockReport(startTime);
  94. // 返回命令cmds
  95. return cmds.size() == 0 ? null : cmds;
  96. }

数据块汇报的blockReport()方法处理流程大体如下:

1、取当前开始时间startTime;

2、如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null:

数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时;

3、构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand;

4、调用reportReceivedDeletedBlocks()方法发送数据块增量汇报;

5、记录上次数据块增量汇报时间lastDeletedReport;

6、设置数据块汇报起始时间brCreateStartTime为当前时间;

7、从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists:

key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs;

8、创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小;

9、取出value:BlockListAsLongs:

9.1、取出value:BlockListAsLongs;

9.2、将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组;

9.3、累加数据块数目totalBlockCount;

10、根据数据块总数目判断是否需要多次发送消息:

10.1、如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送(split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000):

10.1.1、发送的数据块汇报消息数numReportsSent设置为1;

10.1.2、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

10.1.3、将数据块汇报后返回的命令cmd加入到命令列表cmds;

10.2、如果数据块总数目在split阈值之上,将数据块汇报按照DatanodeStorage分多个消息来发送:

10.2.1、发送的数据块汇报消息数numReportsSent设置为i,即DatanodeStorage数目;

10.2.2、遍历reports,取出每个StorageBlockReport:

10.2.2.1、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

10.2.2.2、将数据块汇报后返回的命令cmd加入到命令列表cmds;

11、计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中;

12、调用scheduleNextBlockReport()方法,调度下一次数据块汇报;

13、返回命令cmds。

上一篇:mysql 表中已经存在数据 修改字段类型 varchar(11) 改为 int(11)


下一篇:linux 通过pid 寻找程序路径的最简单命令(pwdx)