RocketMQ:(7) 主从同步(HA)机制

一、RocketMQ主从复制原理

  为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费,RocketMQ引入了Broker主备机制,即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

HAService 整体工作机制

RocketMQ HA 的实现原理如下 。
1 )主服务器启动,并在特定端口上监听从服务器的连接。
2 )从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接。
3 )从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器 。
4 )从服务器保存消息并继续发送新的消息同步请求 。

RocketMQ HA 7个核心类实现

1 ) HAService: RocketMQ 主从同步核心实现类 。
2 ) HAService$AcceptSocketService:HA Master 端监昕客户端连接实现类。实现Master 端监听Slave连接。
3 ) HAService$GroupTransferService:主从同步通知实现类。

  GroupTransferService的职责是负责当主从同步复制结束后通知由于等待 HA 同步结果而阻塞的消息发送者线程。判断主从同步是否完成的依据是 Slave 中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待 1s 再次判断,每一个任务在一批任务中循环判断 5 次。消息发送者返回有两种情况:等待超过5s或GroupTransferService通知主从复制完成 。

4 ) HAService$HAClient: HAClient是主从同步Slave端的核心实现类。

  Step1:Slave 服务器连接 Master 服务器。如果 socketChannel 为空, 则尝试连接Master,建立到Master的TCP连接。在Broker启动时,如果 Broker角色为SLAVE时将读取Broker配置文件中的haMasterAddress属性并更新 HAClient 的 masterAddrees,如果角色为 SLAVE 并且 haMasterAddress 为空,启动并不会报错,但不会执行主从同步复制,该方法最终返回是否成功连接上Master。
  Step2:判断是否需要向Master反馈当前待拉取偏移量,Master与Slave的 HA 心跳发送间隔默认为5S。
  Step3:向 Master 服务器反馈拉取偏移量 。这里有两重意义,对于 Slave 端来说,是发送下次待拉取消息偏移量,而对于 Master 服务端来说,既可以认为是 Slave 本次请求拉取的消息偏移量,也可以理解为 Slave 的消息同步 ACK 确认消息。
  Step4:进行事件选择,其执行间隔为 1s 。
  Step5:处理网络读请求,即处理从 Master 服务器传回的消息数据。

5 ) HAConnection:HA Master 服务端 HA 连接对象的封装,与 Broker 从服务器的网络读写实现类 。Master 服务器在收到从服务器的连接请求后,会将主从服务器的连接 SocketChannel 封装成 HAConnection 对象,实现主服务器与从服务器的读写操作 。

6 ) HAConnection$ReadSocketService:HA Master 网络读实现类 。
7 ) HAConnection$WriteSocketServicce:HA Master 网络写实现类 。

 

二、RocketMQ 读写分离机制

  RocketMQ 根据 MessageQueue查找 Broker 地址的唯一依据是 brokerName,从 RocketMQ 的 Broker 组织结构中得知同一组 Broker ( M-S )服务器,它们的 brokerName 相同但 brokerId 不同,主服务器的 brokerId 为 0,从服务器的 brokerId 大于 0。
  消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新pullFromWhichNodeTable,pullFromWhichNodeTable缓存表中存储该消息队列的brokerId。

消息服务端是根据何种规则来建议哪个消息消费队列该从哪台 Broker 服务器上拉取消息呢?

1 ) maxOffsetPy:代表当前主服务器消息存储文件最大偏移量。
2 ) maxPhyOffsetPulling:此次拉取消息最大偏移量 。
3 ) diff:对于 PullMessageService 线程来说,当前未被拉取到消息消费端的消息长度。
4 ) TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ 所在服务器总内存大小 。accessMessagelnMemoryMaxRatio 表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存;memory 表示 RocketMQ 消息常驻内存的大小,超过该大小,RocketMQ会将旧的消息置换回磁盘。
5 )如果 diff 大于 memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。
  如果主服务器繁忙则建议下一次从从服务器拉取消息,如果一个 Master 拥有多台Slave服务器,参与消息拉取负载的从服务器只会是其中一个。

 

三、总结

1)RocketMQ 的 HA 机制,其核心实现是从服务器在启动的时候主动向主服务器建立 TCP长连接,然后获取服务器的 commitlog 最大偏移量,以此偏移量向主服务器主动拉取消息,主服务器根据偏移量,与自身 commitlog 文件的最大偏移量进行比较,如果大于从服务器的 commitlog 偏移量,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。

2)RocketMQ 读写分离与其他中间件的实现方式完全不同,RocketMQ 是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向消息消费者建议下次消息拉取是从主服务器还是从从服务器拉取。

 

上一篇:Luogu-P3384 【模板】轻重链剖分/树链剖分


下一篇:A. Accelerator (分治+FFT)