RocketMQ学习随笔-Broker启动

文章目录

Broker启动

入口

public static void main(String[] args) {
    start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
    try {

        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

创建BrokerController对象

Namesrv的启动方式类似,首先调用createBrokerController方法创建BrokerController对象:

public static BrokerController createBrokerController(String[] args) {
    //设置版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//设置netty发送缓存区的大小
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
        NettySystemConfig.socketSndbufSize = 131072;
    }
	//设置netty接收缓存区的大小
    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
        NettySystemConfig.socketRcvbufSize = 131072;
    }

    try {
        //PackageConflictDetect.detectFastjson();
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                                              new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
		//broker配置相关对象
        final BrokerConfig brokerConfig = new BrokerConfig();
        //作为netty服务端,与Producer/Consumer通信
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        //作为netty的客户端,与namesrv通信
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
		
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        //服务端监听10911端口
        nettyServerConfig.setListenPort(10911);
        //消息存储配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
		//如果是从节点
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            //AccessMessageInMemoryMaxRatio: 表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存 master ==> 40% slave ==> 30%
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }
		//-c 指定配置文件
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
			
                properties2SystemEnv(properties);
                //将配置文件中对应的值填充到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig四个对象中
                MixAll.properties2Object(properties, brokerConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, messageStoreConfig);

                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
		//必须设置ROCKETMQ_HOME环境变量
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
		//解析namesrv的地址
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }
		//brokerId=0 ==> master brokerId > 0 slave
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }
		//是否选择 dleger技术 后续了解
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }
		//Master监听Slave请求的端口,默认为服务端口+1
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
		//-p: 启动时候日志打印配置信息
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) {//-m:启动时候日志打印导入的配置信息
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }

        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);
		//创建BrokerController对象
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
		//调用BrokerController的initialize方法
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
		//注册JVM钩子,优雅关闭
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        long beginTime = System.currentTimeMillis();
                        controller.shutdown();
                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                    }
                }
            }
        }, "ShutdownHook"));

        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

上面这段代码主要可以分为下边几个部分:

  • 配置文件的解析及加载

    一共生成了brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig保存配置信息,并生成BrokerController对象;

    其中brokerConfig对象对应的配置文件如下:

    #所属集群名字
    brokerClusterName = DefaultCluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName = broker-a
    #0代表master,>0代表slave    
    brokerId = 0
    #nameServer地址,分号分隔
    namesrvAddr=rocketmq-namesrv1:9876;rocketmq-namesrv2:9876
    #在发送消息时,自动创建服务器不存在的Topic,默认创建队列数
    defaultTopicQueueNums=4
    #是否允许broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许broker自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup-true
    #broker 对外提供服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨四点
    deleteWhen = 04
    #文件保留时间,默认48小时
    fileReservedTime = 48
    # commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件默认存30w条, 根据业务情况调整
    mapedFileSizeConsumeQueue=30000
    # destroyMapedFileIntervalForcibly=12000
    # redeleteHangedFileInterval=12000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    storePathRootDir=/usr/local/server/mq/rocketmq/store
    # commitLog存储路径
    storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
    # 消息队列储存路径
    storePathConsumeQueue=/usr/local/server/mq/rocketmq/store/consumequeue
    # 消息索引粗存路径
    storePathIndex=/usr/local/server/mq/rocketmq/store/index
    # checkpoint 文件储存路径
    storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
    # abort 文件存储路径
    abortFile=/usr/local/server/mq/rocketmq/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    # Broker的角色
    # -ASYNC_MASTER 异步复制Master
    # -SYNC_MASTER 同步双写Master
    # -SLAVE
    brokerRole=ASYNC_MASTER
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    # checkTransactionMessageEnable=false
    # 发消息线程池数量
    # sendMessageTreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH
    

    其余配置不做重点学习.

  • 创建BrokerController对象并初始化

    BrokerController构造函数:

    public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        //四个配置对象的赋值
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        //管理每个consumer group消息消费的进度。Cluster的consumer会在消息消费成功后把offset信息同步给broker
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        //TopicConfigManager:管理所有broker上存在的topic以及queue的信息。topic的数据会定时和nameserv做同步,以更新Nameserv上的topic路由信息。
        this.topicConfigManager = new TopicConfigManager(this);
        //针对consumer的请求拉取消息的事件处理,基于netty框架,解析并执行内部业务功能,最后将数据返回
        this.pullMessageProcessor = new PullMessageProcessor(this);
        //针对客户端请求的服务保持,是java的Thread模式,主要是监听消息有新的时候通知客户端执行拉取操作
        this.pullRequestHoldService = new PullRequestHoldService(this);
        //新消息到达的监听服务,联合PullRequestHostService进行消息到达的通知功能
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        //consumer的管理和维护,提供consumer的注册,取消,关闭等
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        //consumer的过滤管理,针对consumer端的消息过滤,主要关系是topic,consumer,expression
        this.consumerFilterManager = new ConsumerFilterManager(this);
        //producer的管理和维护,提供producer的注册,取消,关闭等
        this.producerManager = new ProducerManager();
        //基于netty的框架实现,主要监听客户端的网络操作,网络的链接、关闭、异常、空闲等事件操作
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        //broker对请求处理的封装类,处理对应的操作,主要有通知,重置,转换,状态等
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
        //broker请求外部的封装,主要是通过netty的底层通信完成和namesrv的交互
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        //基于shell的服务端过滤管理
        this.filterServerManager = new FilterServerManager(this);
    
        this.slaveSynchronize = new SlaveSynchronize(this);
    	//各类队列的初始化
        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
    	//broker的服务状态管理,实时记录broker的操作性能
        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    
        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }
    

    ProducerManager:producer的管理和维护,提供producer的注册,取消,关闭等:

    public class ProducerManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
        private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
        //记录producer的物理连接
        private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
            new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
        private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
    
        public ProducerManager() {
        }
        ...
    }
    

    ConsumerManager:consumer的管理和维护,提供consumer的注册,取消,关闭等:

    public class ConsumerManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
        //维护消费者组名以及具体信息之间的关系
        private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
            new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
        //处理消费者的连接,端开,变更等操作
        private final ConsumerIdsChangeListener consumerIdsChangeListener;
    
        public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
            this.consumerIdsChangeListener = consumerIdsChangeListener;
        }
        ...
        
    }
    

    通过Map维护消费者组名及具体内容,其中ConsumerGroupInfo的数据结构如下:

    public class ConsumerGroupInfo {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        //组名
        private final String groupName;
        //topic及其订阅关系 SubscriptionData主要记录了topic的tag信息等
        private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
            new ConcurrentHashMap<String, SubscriptionData>();
        //物理连接信息
        private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
        //消费类型 PULL(推送) || PUSH(拉取)
        private volatile ConsumeType consumeType;
        //消息模式 BROADCASTING(广播) || CLUSTERING(集群)
        private volatile MessageModel messageModel;
        //消费位点
        private volatile ConsumeFromWhere consumeFromWhere;
        //最后一次更新时间
        private volatile long lastUpdateTimestamp = System.currentTimeMillis();
    }
    

    ConsumerOffsetManager:管理每个consumer group消息消费的进度。Cluster的consumer会在消息消费成功后把offset信息同步给broker:

    public class ConsumerOffsetManager extends ConfigManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final String TOPIC_GROUP_SEPARATOR = "@";
    
        private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
            new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
    
        private transient BrokerController brokerController;
    
        public ConsumerOffsetManager() {
        }
    
        public ConsumerOffsetManager(BrokerController brokerController) {
            this.brokerController = brokerController;
        }
        ...
        public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
            final long offset) {
            // topic@group
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            this.commitOffset(clientHost, key, queueId, offset);
        }
    	//更新偏移量
        private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
            ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
            if (null == map) {
                map = new ConcurrentHashMap<Integer, Long>(32);
                map.put(queueId, offset);
                this.offsetTable.put(key, map);
            } else {
                Long storeOffset = map.put(queueId, offset);
                if (storeOffset != null && offset < storeOffset) {
                    log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
                }
            }
        }
    	//查询偏移量
        public long queryOffset(final String group, final String topic, final int queueId) {
            // topic@group
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
            if (null != map) {
                Long offset = map.get(queueId);
                if (offset != null)
                    return offset;
            }
    
            return -1;
        }
    }
    

    可以清晰的看到第一个Map的键为topic@group,值是一个Map;第二个Map的值是queueId,值是offset;

    TopicConfigManager:管理所有broker上存在的topic以及queue的信息。topic的数据会定时和nameserv做同步,以更新Nameserv上的topic路由信息:

    public class TopicConfigManager extends ConfigManager {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final long LOCK_TIMEOUT_MILLIS = 3000;
        private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
    
        private transient final Lock lockTopicConfigTable = new ReentrantLock();
    	//key:topic value:TopicConfig ==> 读写队列数量等信息
        private final ConcurrentMap<String, TopicConfig> topicConfigTable =
            new ConcurrentHashMap<String, TopicConfig>(1024);
        private final DataVersion dataVersion = new DataVersion();
        private transient BrokerController brokerController;
    
        public TopicConfigManager() {
        }
    
        public TopicConfigManager(BrokerController brokerController) {
            this.brokerController = brokerController;
            {
                String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
                if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                    String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
                    TopicConfig topicConfig = new TopicConfig(topic);
                    TopicValidator.addSystemTopic(topic);
                    topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                        .getDefaultTopicQueueNums());
                    topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                        .getDefaultTopicQueueNums());
                    int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                    topicConfig.setPerm(perm);
                    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
                }
            }
            {
                String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(1024);
                topicConfig.setWriteQueueNums(1024);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
    
                String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                int perm = PermName.PERM_INHERIT;
                if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
                    perm |= PermName.PERM_READ | PermName.PERM_WRITE;
                }
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
    
                String topic = this.brokerController.getBrokerConfig().getBrokerName();
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                int perm = PermName.PERM_INHERIT;
                if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
                    perm |= PermName.PERM_READ | PermName.PERM_WRITE;
                }
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
                String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
                String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
                topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            {
                if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
                    String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
                    TopicConfig topicConfig = new TopicConfig(topic);
                    TopicValidator.addSystemTopic(topic);
                    topicConfig.setReadQueueNums(1);
                    topicConfig.setWriteQueueNums(1);
                    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
                }
            }
            {
                String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
        }
    }
    

    Map中存储的是topicName及其对应的topicConfig;

    初始化initialize方法:

    public boolean initialize() throws CloneNotSupportedException {
        //配置文件的加载
        boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();
    
        if (result) {
            try {
                //加载存储服务,默认使用CommitLog
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);
                //是否启用Dleger 默认不启用
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                }
                //用于统计数据
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin 上下文
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
    	//从持久化文件加载CommitLog、ConsumeQueue、IndexFile,并恢复进度
        result = result && this.messageStore.load();
    	
        if (result) {
            //启动netty服务端 clientHousekeepingService监听消费者(Producer/Consumer)的连接存活情况
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            //vip通道 不处理Consumer的PullRequest
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            //处理Producer消息的线程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
    		//处理Consumer的线程池
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
    		//消息应答的线程池
            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
    		//消息查询的线程池
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));
    		//Broker管理的线程池
            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));
    		//Client管理的线程池
            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));
    		//处理心跳的线程池
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_", true));
    		//处理事务的线程池
            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.endTransactionThreadPoolQueue,
                new ThreadFactoryImpl("EndTransactionThread_"));
    		//Consumer管理的线程池
            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));
    		//注册消息处理器以及处理的线程池
            this.registerProcessor();
    		//计算第一次延时时间
            final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            //定时线程,broker统计数据相关,每天打印一次
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);
    		//每十秒一次,持久化ConsumerOffset
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    		//每十秒一次,持久化ConsumerFilter
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    		//每3分钟一次,如果Consumer耗时过大,禁用该Consumer
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);
    		//每一秒钟一次,打印生产者消费队列信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);
    		//每分钟一次,打印Dispatch落后情况
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    		//配置文件中已经加载了NamesrvAddr的地址,则更新本地NameSrv列表缓存
            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                //如果设置从指定端口获取 则每两分钟执行一次
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }
    		//开启DLeger  默认不开启
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
                } else {
                    //打印slave落后信息
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }
    
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;
    
                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }
    
                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
            //加载事务实例
            initialTransaction();
            //Access Controler List
            initialAcl();
            //注册一些钩子
            initialRpcHooks();
        }
        return result;
    }
    
    

    首先创建DefaultMessageStore:

    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        this.messageArrivingListener = messageArrivingListener;
        this.brokerConfig = brokerConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        //文件分配管理服务
        this.allocateMappedFileService = new AllocateMappedFileService(this);
        //是否启用Dleger提交日志,默认使用CommitLog
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        //消费者对列表 {topic:{queueId:consumeQueue}}
        this.consumeQueueTable = new ConcurrentHashMap<>(32);
    	//刷新队列服务
        this.flushConsumeQueueService = new FlushConsumeQueueService();
        //清理磁盘服务,清理CommitLog文件,主要磁盘占用率过高的时清理
        this.cleanCommitLogService = new CleanCommitLogService();
        //清理消费者队列服务
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        //存储计次服务
        this.storeStatsService = new StoreStatsService();
        //索引服务
        this.indexService = new IndexService(this);
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 做高可用,主要为了保证主从同步
            this.haService = new HAService(this);
        } else {
            this.haService = null;
        }
        //计算消息偏移量服务
        this.reputMessageService = new ReputMessageService();
    	//延迟消息服务
        this.scheduleMessageService = new ScheduleMessageService(this);
    		
        this.transientStorePool = new TransientStorePool(messageStoreConfig);
    
        if (messageStoreConfig.isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }
    	//启动文件管理服务
        this.allocateMappedFileService.start();
    	//启动索引服务
        this.indexService.start();
    	//调度列表
        this.dispatcherList = new LinkedList<>();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    	//获取消息存储根路径
        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
        MappedFile.ensureDirOK(file.getParent());
        //读写权限
        lockFile = new RandomAccessFile(file, "rw");
    }
    
    

    DefaultMessageStore启动了多个服务用来管理broker的存储;对象创建完毕之后调用load方法:

    public boolean load() {
        boolean result = true;
    
        try {
            //判断user.home.store文件是否存在 此文件存在则不正常
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
    		//加载延时消息的消费进度
            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }
    
            // load Commit Log 加载Commit Log
            result = result && this.commitLog.load();
    
            // load Consume Queue 加载ConsumeQueue
            result = result && this.loadConsumeQueue();
    
            if (result) {
                //加载检查点
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    			//根据上次是否是正常退出加载索引目录
                this.indexService.load(lastExitOK);
    			// 根据上次是否正常退出,恢复ConsumeQueue,CommitLog的指针和TopicQueue目录
                this.recover(lastExitOK);
    
                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }
    
        if (!result) {
            this.allocateMappedFileService.shutdown();
        }
    
        return result;
    }
    
    

    CommitLog:

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        //目录 ${user.home}/store/commitLog
        //单文件默认大小1024*1024*1024
            this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
            this.defaultMessageStore = defaultMessageStore;
    		//同步刷盘
            if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                this.flushCommitLogService = new GroupCommitService();
                //异步刷盘
            } else {
                this.flushCommitLogService = new FlushRealTimeService();
            }
    		//异步刷盘
            this.commitLogService = new CommitRealTimeService();
    		//在回调中执行
            this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
                @Override
                protected MessageExtBatchEncoder initialValue() {
                    return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
                }
            };
            this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    
        }
    

    CommitLogMappedFile文件组成,默认大小1G,初始化会通过MappedFile对象将相关信息加载进内存:

    CommitLog.load

    public boolean load() {
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }
    
    

    MappedFileQueue.load

    public boolean load() {
        //读取文件目录下所有文件
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order MappedFile文件根据第一个文件的messageId命名,可以通过文件名直接排序
            Arrays.sort(files);
            for (File file : files) {
    			//文件大小不对
                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                             + " length not matched message store config value, please check it manually");
                    return false;
                }
    			
                try {
                    //创建MappedFile文件映射
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
    				//初始指针都设置为最大值
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }
    
        return true;
    }
    
    

    DefaultMessageStore.loadConsumeQueue

    private boolean loadConsumeQueue() {
        // ${user.home}/store/consumequeue目录
        // 结构是${user.home}/store/consumequeue/{topic}/{queueid}/{filename}
        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
        //Topic目录列表
        File[] fileTopicList = dirLogic.listFiles();
        if (fileTopicList != null) {
    
            for (File fileTopic : fileTopicList) {
                // Topic名称为目录名
                String topic = fileTopic.getName();
    			// 该Topic目录下的Queue目录列表
                File[] fileQueueIdList = fileTopic.listFiles();
                if (fileQueueIdList != null) {
                    // 遍历该Topic目录下的Queue目录列表
                    for (File fileQueueId : fileQueueIdList) {
                        int queueId;
                        try {
                            // 队列Id为目录名
                            queueId = Integer.parseInt(fileQueueId.getName());
                        } catch (NumberFormatException e) {
                            continue;
                        }
                        //根据topic,queueId创建ConsumeQueue
                        ConsumeQueue logic = new ConsumeQueue(
                            topic,
                            queueId,
                            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                            this);
                        // 加入this.consumeQueueTable
                        // 结构为ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>>
                        this.putConsumeQueue(topic, queueId, logic);
                        // 加载ConsumeQueue,类似CommitLog的加载方式
                        if (!logic.load()) {
                            return false;
                        }
                    }
                }
            }
        }
    
        log.info("load logics queue all over, OK");
    
        return true;
    }
    
    

    indexService.load

    public boolean load(final boolean lastExitOK) {
         // ${user.home}/store/index目录
        File dir = new File(this.storePath);
        // 目录下的IndexFile文件列表
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {
                try {
                    IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);			
                    // 加载IndexFile,恢复索引
                    f.load();
    			  // 上次是非正常退出
                    if (!lastExitOK) {
                         // IndexFile的最新一条消息的时间 > 检查点的时间
                        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                            .getIndexMsgTimestamp()) {
                            // 销毁并跳过
                            f.destroy(0);
                            continue;
                        }
                    }
    
                    // 加入文件列表
                    log.info("load index file OK, " + f.getFileName());
                    this.indexFileList.add(f);
                } catch (IOException e) {
                    log.error("load file {} error", file, e);
                    return false;
                } catch (NumberFormatException e) {
                    log.error("load file {} error", file, e);
                }
            }
        }
    
        return true;
    }
    
    

    IndexFile的功能是通过Key或时间来查询消息 :

    • 默认位置${user.home}/store/index/{fileName},文件名为创建时间戳
    • 一个IndexFile大小为40 + 5000000 * 4 + 20000000 * 20 = 420000040,依次包括:
    • IndexHeader:共40字节
      • BeginTimestamp:第一个Index的时间戳,8字节
      • EndTimestamp:最后一个Index的时间戳,8字节
      • BeginPhyOffset:第一个Index对应的消息在CommitLog的物理偏移,8字节
      • EndPhyOffset:最后一个Index对应的消息在CommitLog的物理偏移,8字节
      • HashSlotCount:当前Slot的数量,4字节
      • IndexCount:当前Index的数量,4字节
    • SlotTableSlot列表,存放该Slot对应的最新的Index的序号,每个Slot共4字节,默认共5000000个Slot
    • IndexLinkedListIndex列表,每个Index都有同Slot的前一个Index的位置偏移,形成一个链表,每个Index共20字节,默认共20000000个Index

    DefaultMessageStore.recover:

    private void recover(final boolean lastExitOK) {
        // 恢复ConsumeQueue的刷盘指针,返回最大的物理偏移
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    	//正常退出
        if (lastExitOK) {
            //正常恢复CommitLog的刷盘指针
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
            //非正常退出
        } else {
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }
    
        this.recoverTopicQueueTable();
    }
    
    

    DefaultMessageStore#recoverConsumeQueue:

    private long recoverConsumeQueue() {
        long maxPhysicOffset = -1;
        //遍历所有topic
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            //遍历所有消息队列
            for (ConsumeQueue logic : maps.values()) {
                //检查ConsumeQueue的内容 恢复刷盘指针
                logic.recover();
                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                    //更新最大偏移量
                    maxPhysicOffset = logic.getMaxPhysicOffset();
                }
            }
        }
    
        return maxPhysicOffset;
    }
    
    

    DefaultMessageStore#recover:更新MappedFileQueue的偏移量

    public void recover() {
        //获取所有MappedFile文件
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
    
            //检查最后三个文件
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;
    		
            //获取当前文件的大小
            int mappedFileSizeLogics = this.mappedFileSize;
            //获取对应位置的文件
            MappedFile mappedFile = mappedFiles.get(index);
            //创建新的子缓冲区
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            //获取起始偏移量
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                //遍历所有的消息 大小为20个字节
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
    
                    if (offset >= 0 && size > 0) {
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        //计算出最大物理偏移量
                        this.maxPhysicOffset = offset + size;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                                 + offset + " " + size + " " + tagsCode);
                        break;
                    }
                } //for循环结束 当前文件检查结束 
                
                //当前文件检查结束 所有单元都合法
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {
    					//所有文件都检查结束
                        log.info("recover last consume queue file over, last mapped file "
                                 + mappedFile.getFileName());
                        break;
                    } else {
                        //继续检查下一个文件
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    //文件异常 不需要再继续检查
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                             + (processOffset + mappedFileOffset));
                    break;
                }
            }
    
            //文件检查结束 获取最大偏移量
            processOffset += mappedFileOffset;
            //设置刷盘及提交位置的指针
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            //截断后面的无效文件
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }
    
    

    ConsumeQueue#recoverTopicQueueTable:更新消费队列的最小偏移量

    public void recoverTopicQueueTable() {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                table.put(key, logic.getMaxOffsetInQueue());
                logic.correctMinOffset(minPhyOffset);
            }
        }
    	//设置消息消费队列偏移量
        this.commitLog.setTopicQueueTable(table);
    }
    
    

    //注册处理线程

    BrokerController.registerProcessor

    public void registerProcessor() {
        /**
             * SendMessageProcessor
             */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    	//处理producer发送消息
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        //处理producer发送批量消息
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        //处理consumer发回broker的消息,要求重新消费
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        // VIPChannel共用处理器和业务线程池,只是监听的端口不同
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        /**
             * PullMessageProcessor
             */
        // 处理Consumer拉取消息请求,VIPChannel没有这个
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        //钩子
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
        /**
             * ReplyMessageProcessor 消息重试
             */
        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
    
        /**
             * QueryMessageProcessor 查询消息
             */
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
        /**
             * ClientManageProcessor  客户端请求处理
             */
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
        /**
             * ConsumerManageProcessor 消费端请求处理
             */
        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
        /**
             * EndTransactionProcessor 事务请求处理
             */
        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
        /**
             * Default
             */
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    }
    
  • 注册JVM钩子函数并调用BrokerController.shutdown()函数实现优雅关闭

    public void shutdown() {
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.shutdown();
        }
    
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.shutdown();
        }
    
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.shutdown();
        }
    
        if (this.remotingServer != null) {
            this.remotingServer.shutdown();
        }
    
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.shutdown();
        }
    
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    
        if (this.messageStore != null) {
            this.messageStore.shutdown();
        }
    
        this.scheduledExecutorService.shutdown();
        try {
            this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    
        this.unregisterBrokerAll();
    
        if (this.sendMessageExecutor != null) {
            this.sendMessageExecutor.shutdown();
        }
    
        if (this.pullMessageExecutor != null) {
            this.pullMessageExecutor.shutdown();
        }
    
        if (this.replyMessageExecutor != null) {
            this.replyMessageExecutor.shutdown();
        }
    
        if (this.adminBrokerExecutor != null) {
            this.adminBrokerExecutor.shutdown();
        }
    
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.shutdown();
        }
    
        this.consumerOffsetManager.persist();
    
        if (this.filterServerManager != null) {
            this.filterServerManager.shutdown();
        }
    
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.shutdown();
        }
    
        if (this.consumerFilterManager != null) {
            this.consumerFilterManager.persist();
        }
    
        if (this.clientManageExecutor != null) {
            this.clientManageExecutor.shutdown();
        }
    
        if (this.queryMessageExecutor != null) {
            this.queryMessageExecutor.shutdown();
        }
    
        if (this.consumerManageExecutor != null) {
            this.consumerManageExecutor.shutdown();
        }
    
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.shutdown(false);
        }
    
        if (this.endTransactionExecutor != null) {
            this.endTransactionExecutor.shutdown();
        }
    }
    
    

调用start方法

资源加载完毕,调用start方法真正启动:

public void start() throws Exception {
    	//启动消息存储服务
        if (this.messageStore != null) {
            this.messageStore.start();
        }
		//启动netty服务端,监听请求
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
		//VIP通道
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
		//启动TLS证书检测服务
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
		//启动netty客户端,连接namesrv
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
		// 启动PushConsumer的PullRequestHoldService
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
		//启动监控服务,每十秒钟检测producer,consumer,filterserver是否正常
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
		// 启动FilterServer管理计划任务,每三十秒运行一次启动脚本,保持一定数量的FilterServer运行
            // FilterServer会向Broker注册,在Broker和Consumer间起过滤消息的作用,由直接拉取消息变成了:Consumer -> FilterServer -> Broker
            // 大概是想减轻Consumer的过滤压力?或者是过滤掉不应该由Consumer看到到敏感消息?
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }

        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 如果是Master,启动事务消息检查服务
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            // 如果是Slave,通过计划任务,每十秒执行一次,通过VIP通道向Master同步配置并更新本地缓存及持久化,包括TopicConfig,ConsumerOffset,DelayOffset及SubscriptionGroupConfig
            // 这里只同步配置,CommitLog在HAService同步
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            // 发送心跳,向NameServer广播REGISTER_BROKER的单向请求,包含Topic信息,NameServer据此决定路由信息
            this.registerBrokerAll(true, false, true);
        }
		// 计划任务,每十秒一次,向NameServer注册
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
		//启动统计服务
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
		//启动快速失败,定时清理超时客户端请求
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

DefaultMessageStore#start:读取配置,启动核心线程

public void start() throws Exception {
    //	启动前先获取锁
    lock = lockFile.getChannel().tryLock(0, 1, false);
    // 启动失败说明MQ已经正常启动
    if (lock == null || lock.isShared() || !lock.isValid()) {
        throw new RuntimeException("Lock failed,MQ already started");
    }
	//加锁刷盘
    lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
    lockFile.getChannel().force(true);
    {
        //通过commitLog的getMinOffset方法获取最小偏移量 maxPhysicalPosInLogicQueue的最大偏移量
        long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
        //遍历topic
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                    //队列的最大物理偏移量为当前ConsumeQueue的最大物理偏移量
                    maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                }
            }
        }
        if (maxPhysicalPosInLogicQueue < 0) {
            maxPhysicalPosInLogicQueue = 0;
        }
        if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
            maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
            log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
        }
        log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
                 maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
        //设置分消息到ConsumeQueue和IndexService的起始偏移量
        this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
        //启动分发服务
        this.reputMessageService.start();
		
        while (true) {
            // 不断检查,直到CommitLog的MaxOffset和ReputMessageService的ReputFromOffset差值小于等于零
            if (dispatchBehindBytes() <= 0) {
                break;
            }
            Thread.sleep(1000);
            log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
        }
        this.recoverTopicQueueTable();
    }

    //默认 
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        //启用高可用服务
        this.haService.start();
        //如果Broker是master,启动ScheduleMessageService,处理延迟消息,否则关闭
        this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
    }
	//将ConsumeQueue刷到磁盘
    this.flushConsumeQueueService.start();
    //启动commitLog服务
    this.commitLog.start();
    //启动统计服务
    this.storeStatsService.start();
	//创建${user.home}/store/abort
    this.createTempFile();
    //添加定时任务
    this.addScheduleTask();
    this.shutdown = false;
}

CommitLog#getMinOffset

public long getMinOffset() {
    //MappedFile类主要是持有文件相关的属性
    MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
    if (mappedFile != null) {
        //获取第一个有效文件的offset
        if (mappedFile.isAvailable()) {
            return mappedFile.getFileFromOffset();
        } else {
            return this.rollNextFile(mappedFile.getFileFromOffset());
        }
    }

    return -1;
}

reputMessageService.start的是启动ReputMessageService类中定义好的run方法,而其核心是调用doReput方法:

从CommitLog读取消息发送至ConsumeQueue

private void doReput() {
    //如果分发的偏移量小于当前commitLog的最小偏移量
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                 this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
		//获取数据
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        //存在待reput的数据
        if (result != null) {
            try {
                //获取文件起始偏移量
                this.reputFromOffset = result.getStartOffset();

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            //调用DefaultMessageStore.doDispatch方法
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
						//如果broker是master&&启用了长轮询
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                         //分发消息到messageArrivingListener,唤醒等待的PullRequest接收消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                                                          dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                                                          dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                                                          dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }
							// 累计更新Reput的起始字节偏移
                         	this.reputFromOffset += size;
                            readSize += size;
                            //如果Broker是Slave,累计
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                            // 是BLANK,读到了MappedFile文件尾
                        } else if (size == 0) {
                            //切换到新的文件
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                        //检验消息出错 可能是commitlog文件损坏
                    } else if (!dispatchRequest.isSuccess()) {
					   //消息 跳过
                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        //空白 填充
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                          this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                // 释放MappedFile的引用计数
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

首先看循环结束条件isCommitLogAvailable方法:

private boolean isCommitLogAvailable() {
    return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

获取当前commitLog对象的最大偏移量:

public long getMaxOffset() {
    return this.mappedFileQueue.getMaxOffset();
}

public long getMaxOffset() {
    //获取最后一个文件
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        //返回物理偏移量+读指针位置
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

CommiLog.getData方法:

public SelectMappedBufferResult getData(final long offset) {
    return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
    //获取单个文件的大小 默认1024*1024*1024 = 1G
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    //根据偏移量获取mappedFile对象
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if (mappedFile != null) {
        //偏移量与文件大小取余
        int pos = (int) (offset % mappedFileSize);
        //读取数据到缓冲区
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }

    return null;
}

MappedFileQueue.findMappedFileByOffset方法:

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        //首先判断offset是否在当前MappedFileQueue的范围内
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                               offset,
                               firstMappedFile.getFileFromOffset(),
                               lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                               this.mappedFileSize,
                               this.mappedFiles.size());
            } else {
                //获取当前偏移量在MappedFileQueue中文件所对应的下表
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    //获取文件
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }
				//进一步检查
                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }
				//如果上面找到的不是目标文件,遍历所有的文件
                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }
			//没找到 是否返回第一个文件
            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

进入循环,调用Commit.checkMessageAndReturnSize方法封装DispatchRequest对象:

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
    try {
        // 1 TOTAL SIZE
        int totalSize = byteBuffer.getInt();

        // 2 MAGIC CODE
        int magicCode = byteBuffer.getInt();
        switch (magicCode) {
            case MESSAGE_MAGIC_CODE:
                break;
            case BLANK_MAGIC_CODE:
                return new DispatchRequest(0, true /* success */);
            default:
                log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
                return new DispatchRequest(-1, false /* success */);
        }

        byte[] bytesContent = new byte[totalSize];

        int bodyCRC = byteBuffer.getInt();

        int queueId = byteBuffer.getInt();

        int flag = byteBuffer.getInt();

        long queueOffset = byteBuffer.getLong();

        long physicOffset = byteBuffer.getLong();

        int sysFlag = byteBuffer.getInt();

        long bornTimeStamp = byteBuffer.getLong();

        ByteBuffer byteBuffer1;
        if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        long storeTimestamp = byteBuffer.getLong();

        ByteBuffer byteBuffer2;
        if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        int reconsumeTimes = byteBuffer.getInt();

        long preparedTransactionOffset = byteBuffer.getLong();

        int bodyLen = byteBuffer.getInt();
        if (bodyLen > 0) {
            if (readBody) {
                byteBuffer.get(bytesContent, 0, bodyLen);

                if (checkCRC) {
                    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
                    if (crc != bodyCRC) {
                        log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
                        return new DispatchRequest(-1, false/* success */);
                    }
                }
            } else {
                byteBuffer.position(byteBuffer.position() + bodyLen);
            }
        }

        byte topicLen = byteBuffer.get();
        byteBuffer.get(bytesContent, 0, topicLen);
        String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);

        long tagsCode = 0;
        String keys = "";
        String uniqKey = null;

        short propertiesLength = byteBuffer.getShort();
        Map<String, String> propertiesMap = null;
        if (propertiesLength > 0) {
            byteBuffer.get(bytesContent, 0, propertiesLength);
            String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
            propertiesMap = MessageDecoder.string2messageProperties(properties);

            keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);

            uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

            String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
            if (tags != null && tags.length() > 0) {
                tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
            }

            // Timing message processing
            {
                String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                    int delayLevel = Integer.parseInt(t);

                    if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                    }

                    if (delayLevel > 0) {
                        tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                                                                                                storeTimestamp);
                    }
                }
            }
        }

        int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
        if (totalSize != readLength) {
            doNothingForDeadCode(reconsumeTimes);
            doNothingForDeadCode(flag);
            doNothingForDeadCode(bornTimeStamp);
            doNothingForDeadCode(byteBuffer1);
            doNothingForDeadCode(byteBuffer2);
            log.error(
                "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
                totalSize, readLength, bodyLen, topicLen, propertiesLength);
            return new DispatchRequest(totalSize, false/* success */);
        }

        return new DispatchRequest(
            topic,
            queueId,
            physicOffset,
            totalSize,
            tagsCode,
            storeTimestamp,
            queueOffset,
            keys,
            uniqKey,
            sysFlag,
            preparedTransactionOffset,
            propertiesMap
        );
    } catch (Exception e) {
    }

    return new DispatchRequest(-1, false /* success */);
}

上面这个方法的功能是从bugger中读取一条消息,消息的结构如下:

1 totalSize(4Byte) 消息大小
2 magicCode(4) 设置为daa320a7
3 bodyCRC(4) 当broker重启recover时会校验
4 queueId(4) 消息对应的consumeQueueId
5 flag(4) rocketmq不做处理,只存储后透传
6 queueOffset(8) 消息在consumeQueue中的偏移量
7 physicalOffset(8) 消息在commitlog中的偏移量
8 sysFlg(4) 事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
9 bronTimestamp(8) 消息产生端(producer)的时间戳
10 bronHost(8) 消息产生端(producer)地址(address:port)
11 storeTimestamp(8) 消息在broker存储时间
12 storeHostAddress(8) 消息存储到broker的地址(address:port)
13 reconsumeTimes(4) 消息重试次数
14 preparedTransactionOffset(8) 事务消息的物理偏移量
15 bodyLength(4) 消息长度,最长不超过4MB
16 body(body length Bytes) 消息体内容
17 topicLength(1) 主题长度,最长不超过255Byte
18 topic(topic length Bytes) 主题内容
19 propertiesLength(2) 消息属性长度,最长不超过65535Bytes
20 properties(properties length Bytes) 消息属性内容

然后调用ReputMessageService.doDispatch方法:

public void doDispatch(DispatchRequest req) {
    //遍历分发器列表
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        //执行分发
        dispatcher.dispatch(req);
    }
}

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        //获取事务类型
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE://非事务消息
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE://事务消息已提交
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

dispatcherList是在DefaultMessageStore初始化时创建的:

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

先看CommitLogDispatcherBuildConsumeQueue中的实现,当满足TRANSACTION_NOT_TYPE,TRANSACTION_COMMIT_TYPE这两个条件时,调用DefaultMessageStore.putMessagePositionInfo方法:

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //根据topic和queueid查找ConsumeQueue,不存在则直接创建
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

调用ConsumeQueue.putMessagePositionInfoWrapper方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {
    final int maxRetries = 30;
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    for (int i = 0; i < maxRetries && canWrite; i++) {
        long tagsCode = request.getTagsCode();
        //是否额外写入信息 默认false
        if (isExtWriteEnable()) {
            //计算扩展的偏移量
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());
			
            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            //判断是否可以扩展
            if (isExtAddr(extAddr)) {
                tagsCode = extAddr;
            } else {
                log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                         topic, queueId, request.getCommitLogOffset());
            }
        }
        //调用putMessagePositionInfo
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                                                     request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        //写入成功
        if (result) {
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
 //设置StoreCheckpoint为消息存储时间
 this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {
            //写入失败 重试
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                     + " failed, retry " + i + " times");

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.warn("", e);
            }
        }
    }

    // XXX: warn and notify me 重试无效 记录日志
    log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

ConsumeQueue.putMessagePositionInfo:

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) {
	//判断是否已经处理过了
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }
	//充值索引
    this.byteBufferIndex.flip();
    //ConsummeQueue单元大小,20个字节
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //消息在CommitLog的偏移量 8个字节
    this.byteBufferIndex.putLong(offset);
    //消息的大小 4个字节
    this.byteBufferIndex.putInt(size);
    //
    this.byteBufferIndex.putLong(tagsCode);
	//预期的队列偏移量 cqOffset:记录的消息数量  CQ_STORE_UNIT_SIZE:记录一条消息所占用的内存
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
	//获取存储消息信息的mappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
		//如果是第一个文件 且mappedFile还没有被写过
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                     + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            //当前的实际逻辑偏移量
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
			//期望的逻辑偏移量小于实际的逻辑偏移量 表明重复写入直接返回
            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                         expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }
			//如果期望的逻辑偏移量不等于实际的逻辑偏移量 可能是个bug
            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        //更新maxPhysicOffset,并将暂存在ByteBuffer中的消息偏移信息,追加到MappedFile中
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

MappedFile.appendMessage:

public boolean appendMessage(final byte[] data) {
    //获取当前写指针的位置
    int currentPos = this.wrotePosition.get();
	//追加数据
    if ((currentPos + data.length) <= this.fileSize) {
        try {
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        //移动写指针
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

CommitLogDispatcherBuildIndex.dispatch

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
//IndexFile是另一种形式的索引文件 可以根据消息的id等查询消息
    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

回到DefaultMessageStore.start方法,启动一个循环直到消息分发完毕:

public long dispatchBehindBytes() {
    return this.reputMessageService.behind();
}

public long behind() {
    //commitLog最大偏移减去当前偏移
    return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}

接着调用DefaultMessageStore.recoverTopicQueueTable方法:

//消费队列 
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

public void recoverTopicQueueTable() {
    HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
    long minPhyOffset = this.commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            //矫正最小偏移量
            logic.correctMinOffset(minPhyOffset);
        }
    }

    this.commitLog.setTopicQueueTable(table);
}

然后在非Deleger模式下会启用高可用服务.

接着开始执行flushConsumeQueueService.start()方法,启动刷盘线程,:

private void doFlush(int retryTimes) {
    // 默认2
    int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
    // 关闭时传入
    if (retryTimes == RETRY_TIMES_OVER) {
        // 强制Flush
        flushConsumeQueueLeastPages = 0;
    }
    long logicsMsgTimestamp = 0;
    // 默认60秒
    int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
    long currentTimeMillis = System.currentTimeMillis();
    // 离上次刷盘时间超出了间隔
    if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
        // 更新刷盘时间戳
        this.lastFlushTimestamp = currentTimeMillis;
        // 强制Flush
        flushConsumeQueueLeastPages = 0;
        // 更新检查点
        logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
    }
    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    // 遍历Topic
    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
        // 遍历ConsumeQueue
        for (ConsumeQueue cq : maps.values()) {
            boolean result = false;
            for (int i = 0; i < retryTimes && !result; i++) {
                // 刷盘
                result = cq.flush(flushConsumeQueueLeastPages);
            }
        }
    }
    // 如果是强制刷盘
    if (0 == flushConsumeQueueLeastPages) {
        if (logicsMsgTimestamp > 0) {
            DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
        }
        // 检查点也要刷盘
        DefaultMessageStore.this.getStoreCheckpoint().flush();
    }
}

然后调用ConsumeQueue.flush方法:

public boolean flush(final int flushLeastPages) {
    //队列映射文件刷新
    boolean result = this.mappedFileQueue.flush(flushLeastPages);
    //存在consume_ext文件 也要刷盘
    if (isExtReadEnable()) {
        result = result & this.consumeQueueExt.flush(flushLeastPages);
    }

    return result;
}

MappedFileQueue.flush方法:

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    //根据偏移量找到MappedFile对象
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        //更新flushedWhere
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

调用MappedFile.flush:

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            //获取读指针偏移量
            int value = getReadPosition();

            try {
                //追加数据到fileChannel或者mappedByteBuffer
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }

            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

刷盘机制

回到DefaultMessageStore.start()方法,ConsumeQueue刷盘启动后,调用CommitLog.start方法,启动CommitLog:

 public void start() {
     this.flushCommitLogService.start();

     if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
         this.commitLogService.start();
     }
 }

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YXqAiAgc-1624718853632)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘和异步刷盘.png)]

FlushCommitLogService拥有两个实现类GroupCommitServiceFlushRealTimeService分别对应了同步刷盘和异步刷盘.

提交刷盘请求

GroupCommitService中有两个List,用于刷盘时解耦:

private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

public static class GroupCommitRequest {
    private final long nextOffset;
    private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
    private final long startTimestamp = System.currentTimeMillis();
    private long timeoutMillis = Long.MAX_VALUE;

    public GroupCommitRequest(long nextOffset, long timeoutMillis) {
        this.nextOffset = nextOffset;
        this.timeoutMillis = timeoutMillis;
    }

    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset() {
        return nextOffset;
    }

    public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
        this.flushOKFuture.complete(putMessageStatus);
    }

    public CompletableFuture<PutMessageStatus> future() {
        return flushOKFuture;
    }

}

GroupCommitRequest对象在CommitLog.handleDiskFlush方法中被创建,当Broker接收到Producer的消息后会调用此方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        //获取刷盘服务
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            //封装刷盘请求 获取写指针位置以及字节数计算出nextOffset
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //提交刷盘请求
            service.putRequest(request);
            //获取请求结果
            CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
            PutMessageStatus flushStatus = null;
            try {
                //阻塞五秒,等待刷盘结束
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                          + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // 异步刷盘
    else {
        //未开启对堆内存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            //开启外堆内存
            commitLogService.wakeup();
        }
    }
}

调用GroupCommitService.putRequest方法:

public synchronized void putRequest(final GroupCommitRequest request) {
    //加锁添加请求
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    //唤醒线程
    this.wakeup();
}


public void wakeup() {
    //原子操作 将hasNotified的值从false改为true
    if (hasNotified.compareAndSet(false, true)) {
        //计数器减一 
        //protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
        //唤醒刷盘线程
        waitPoint.countDown(); // notify
    }
}

同步刷盘

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm7Sb1IZ-1624718853635)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘流程.png)]

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

GroupCommitService.run():

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            //调用waitForRunning方法
            this.waitForRunning(10);
            //调用doCommit方法
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}

异步刷盘

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K2dRDrWF-1624718853636)(E:\源码\学习笔记\RocketMQ学习随笔\img\异步刷盘流程.png)]

开启transientStorePoolEnable后异步刷盘步骤:

  1. 将消息直接追加到ByteBuffer(堆外内存)
  2. CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
  3. MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
  4. commit操作成功返回,将committedPosition位置恢复
  5. FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

  1. 未开启堆外内存FlushRealTimeService.run:

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            //是否使用定时刷盘
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
    
            //间隔时间 默认500毫秒
            int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            //刷盘页数 默认4页
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    		//彻底刷盘时间间隔 默认10s
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
            //打印刷盘进度
            boolean printFlushProgress = false;
    
            // Print flush progress 获取系统当前时间
            long currentTimeMillis = System.currentTimeMillis();
            //系统当前时间大于等于最后一次刷盘时间点+彻底刷盘时间间隔 则进行一次彻底刷盘
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }
    
            try {
                //启用定时刷盘
                if (flushCommitLogTimed) {
                    //休眠500毫秒
                    Thread.sleep(interval);
                } else {
                    //调用waitForRunning方法 刷盘线程是否被唤醒 进行500毫秒的阻塞
                    this.waitForRunning(interval);
                }
    
                //打印刷盘进程
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
    
                long begin = System.currentTimeMillis();
                //调用flush方法 开始刷盘
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    //更新刷盘时间点
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
    
        // Normal shutdown, to ensure that all the flush before exit
        //正常关闭的情况下 完成刷盘操作
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
    
        this.printFlushProgress();
    
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    

    与同步刷盘不同的是,异步刷盘会传入页数:

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
    
        return result;
    }
    
    

    MappedFile.flush方法中:

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            ...
        }
    
    

    调用isAbleToFlush方法,判断是否进行刷盘:

    private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();
    	//写指针的位置==当前文件的大小
        if (this.isFull()) {
            return true;
        }
    	//最少刷盘页数大于0(同步刷盘不存在这种情况)
        if (flushLeastPages > 0) {
            //异步刷盘必须满足大于等于最小刷盘页数
            //public static final int OS_PAGE_SIZE = 1024 * 4;
    	   // 即在默认情况下 每次刷盘的数据量大于等于 4 * 4K
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }
    
        return write > flush;
    }
    
    
  2. 开启堆外内存CommitRealTimeService.run

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            //间隔时间,默认200ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
    		//一次提交的至少页数
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
    		//两次真实提交的最大间隔,默认200ms
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
    
            //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交
            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }
    
            try {
                //执行提交操作,将待提交数据提交到物理文件的内存映射区
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    //唤醒刷盘线程
                    flushCommitLogService.wakeup();
                }
    
                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                //调用waitForRunning方法
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    
    

刷盘操作

调用MappedFile.commit方法:

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //开启堆外内存的清空下 writeBuffer!= null
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    
    //判断是否满足提交条件
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    //所有的数据都被写入 回收writeBuffer
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

MappedFile.isAbleToCommit

private boolean isAbleToFlush(final int flushLeastPages) {
    //刷盘位置的指针
    int flush = this.flushedPosition.get();
    //有效数据的最大指针
    int write = getReadPosition();

    if (this.isFull()) {
        return true;
    }

    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }

    return write > flush;
}

MappedFile.commit0:

//将writerBuffer中的数据写入fileChannel中 更新指针的位置
protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - lastCommittedPosition > commitLeastPages) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

主从复制HA

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

(1)同步复制

同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。

(2)异步复制

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;

(3)配置

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

Master

HAService#Start 服务启动:

public void start() throws Exception {
    // 打开ServerSocketChannel,监听端口,注册OP_ACCEPT事件
    this.acceptSocketService.beginAccept();
    // 启动AcceptSocketService,接收Slave的连接请求
    this.acceptSocketService.start();
     // 启动GroupTransferService,处理同步Master情况下的主从同步,只是起检查通知的作用
    this.groupTransferService.start();
    // 启动HAClient,主动连接Master
    this.haClient.start();
}

HAService$AcceptSocketService#beginAccept:初始化netty服务端设置

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

HAService$AcceptSocketService#run 启动服务端监听线程

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            //获取就绪事件
            Set<SelectionKey> selected = this.selector.selectedKeys();

            if (selected != null) {
                //遍历事件
                for (SelectionKey k : selected) {
                    //连接事件
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        //接收连接请求 返回slave的SocketChannel
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                               + sc.socket().getRemoteSocketAddress());

                            try {
                                //生成高可用连接对象
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                conn.start();
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

HAConnection#start 高可用连接启动

 public void start() {
     //处理slave的读请求
     this.readSocketService.start();
     //处理slave的写请求
     this.writeSocketService.start();
 }

HAConnection$ReadSocketService#run ` :

public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            //事件处理
            boolean ok = this.processReadEvent();
            //处理失败 关闭
            if (!ok) {
                HAConnection.log.error("processReadEvent error");
                break;
            }

            //计算间隔事件
            long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
            //超时未收到slave的请求 关闭
            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                break;
            }
        } catch (Exception e) {
            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    //关闭后续处理 资源回收逻辑
    this.makeStop();

    writeSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    HAConnection.this.haService.getConnectionCount().decrementAndGet();

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

读请求处理

HAConnection$ReadSocketService#processReadEvent 处理读事件:

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    //缓冲已读完
    if (!this.byteBufferRead.hasRemaining()) {
        //重置索引
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }

    //直到缓冲读完
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读取数据到读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            //读到了数据
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                //更新时间戳
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 读缓冲的写索引 - 上次处理位置 >= 8字节的Slave的偏移说明收到了足够的数据包
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    //可能有多个数据包 从第一个未处理的数据包开始
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    // 从读缓冲读取Slave的CommitLog同步偏移,8字节
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    //更新处理未知
                    this.processPosition = pos;

                    //应该响应的slave的CommitLog偏移
                    HAConnection.this.slaveAckOffset = readOffset;
                    //第一次接收到请求
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        //更新slave发送请求时CommitLog的偏移
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    }
//根据Slave响应的同步偏移进度,通知HAService的GroupTransferService判断同步进度,是否解除Broker的Slave同步请求的阻塞
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                //最多重试三次
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}

HAService#notifyTransferSome: 更新已同步的偏移量

 public void notifyTransferSome(final long offset) {
     // Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
     for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
         // 尝试更新偏移
         boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
         if (ok) {
             // 通知已经传输了数据
             this.groupTransferService.notifyTransferSome();
             break;
         } else {
             // 再次获取当前已向Slave传输的CommitLog最大偏移
             value = this.push2SlaveMaxOffset.get();
         }
     }
 }

写请求处理

HAConnection$WriteSocketService#run:

public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
			//初始值=-1 未收到slave的同步请求
            if (-1 == HAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }

            // 初始值
            if (-1 == this.nextTransferFromWhere) {
                //已同步偏移量为0 之前从未同步过
                if (0 == HAConnection.this.slaveRequestOffset) {
                    // 获取当前Master的CommitLog的最大物理偏移
                    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    
                    // 截掉后面的不够单个CommitLog大小的偏移
                    masterOffset =
                        masterOffset
                        - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                           .getMappedFileSizeCommitLog());

                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }

                    // 更新下次传输的偏移起点为最后一个CommitLog文件的起始偏移
                    this.nextTransferFromWhere = masterOffset;
                } else {
                    //更新下次传输的偏移起点为Slave发送请求时的偏移设置
                    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                         + "], and slave request " + HAConnection.this.slaveRequestOffset);
            }

            // 上次向Slave写完了数据
            if (this.lastWriteOver) {

                 // 写间隔时间
                long interval =
                    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

                //间隔时间大于设置的时间 向slave写
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaSendHeartbeatInterval()) {

                    // Build Header
                    //心跳包
                    // 重置Header的写索引
                    this.byteBufferHeader.position(0);
                    //Header大小 12个字节
                    this.byteBufferHeader.limit(headerSize);
                    //偏移量 8字节
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    //消息体大小 4字节
                    this.byteBufferHeader.putInt(0);
                    //切换为读模式
                    this.byteBufferHeader.flip();

                    // 向Slave传输数据/心跳
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
            } else {
                //未写完则继续写入
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }
			// 根据起始偏移获取CommitLog数据
            SelectMappedBufferResult selectResult =
                HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            
            if (selectResult != null) {
                // CommitLog数据的大小
                int size = selectResult.getSize();
                //大于传输限制大小 截断
                if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }

                //本次传输数据起始偏移
                long thisOffset = this.nextTransferFromWhere;
                //下次传输数据起始偏移
                this.nextTransferFromWhere += size;

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // Build Header 封装消息头
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();
				//向slave传输数据
                this.lastWriteOver = this.transferData();
            } else {
				// 等待CommitLog有消息唤醒或超时
                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    //服务关闭 资源释放
    HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    this.makeStop();

    readSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

HAConnection$WriteSocketService#transferData 向slave传输数据:

private boolean transferData() throws Exception {
    int writeSizeZeroTimes = 0;
    // Write Header
    //向SocketChannel写入header 直到写完
    while (this.byteBufferHeader.hasRemaining()) {
        int writeSize = this.socketChannel.write(this.byteBufferHeader);
        if (writeSize > 0) {
            writeSizeZeroTimes = 0;
            //更新写入时间戳
            this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        } else if (writeSize == 0) {
            //最多重试三次
            if (++writeSizeZeroTimes >= 3) {
                break;
            }
        } else {
            throw new Exception("ha master write header error < 0");
        }
    }

    //没有数据就不用写消息体了
    if (null == this.selectMappedBufferResult) {
        return !this.byteBufferHeader.hasRemaining();
    }

    writeSizeZeroTimes = 0;

    // Write Body // 如果Header已经读完,写消息体
    if (!this.byteBufferHeader.hasRemaining()) {
         // 直到要传输的CommitLog数据全部读完
        while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                //做多重试三次
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                throw new Exception("ha master write body error < 0");
            }
        }
    }

    //消息头和消息体全部读完
    boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

    //消息体已读完
    if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
        //释放
        this.selectMappedBufferResult.release();
        this.selectMappedBufferResult = null;
    }

    return result;
}

Slave

HAService$HAClient#run 客户端启动

public void run() {
    while (!this.isStopped()) {
        try {
            // 连接Master成功
            if (this.connectMaster()) {
                // 离上次向Master上报进度的间隔到期了,默认5s
                if (this.isTimeToReportOffset()) {
                    // 向Master发送当前Slave的CommitLog的最大偏移,同时也是心跳
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }
                // 阻塞超时至可读事件就绪
                this.selector.select(1000);
                // 处理读事件,即Master传输的CommitLog数据
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }
                // 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }
                // 上次向Master发送数据的时间间隔
                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                // 认为连接超时,关闭连接
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    this.closeMaster();
                }
            } else {
                // 唤醒后再尝试
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            this.waitForRunning(1000 * 5);
        }
    }
}

HAService$HAClient#connectMaster 连接master

private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {
                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        // 连接Master,返回SocketAddress
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            // 关心可读事件
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }
                // 当前Slave的CommitLog最大物理偏移
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
            return this.socketChannel != null;
        }

写请求处理

HAService$HAClient#reportSlaveMaxOffset 上报当前CommitLog最大偏移量:

private boolean reportSlaveMaxOffset(final long maxOffset) {
    //重置写索引
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    //写入最大偏移量
    this.reportOffset.putLong(maxOffset);
    //重置读索引
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            //上报偏移量
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                      + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    //更新时间戳
    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    //返回缓冲是否发完
    return !this.reportOffset.hasRemaining();
}

读请求处理

HAService$HAClient#processReadEvent

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    // 读缓冲可写
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 将数据从SocketChannel写入读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            // 读取到了数据
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                // 分发读请求
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    return false;
                }
            }
            // 没读到数据
            else if (readSize == 0) {
                // 重试三次没读到数据就退出
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                return false;
            }
        } catch (IOException e) {
            return false;
        }
    }
    return true;
}

HAService$HAClient#dispatchReadRequest 分发读请求:

 private boolean dispatchReadRequest() {
     //消息头大小 12个字节
     final int msgHeaderSize = 8 + 4; // phyoffset + size
     //起始偏移量
     int readSocketPos = this.byteBufferRead.position();

     while (true) {
          dispatchPosition初始为0,可理解为当前消息包的在读缓冲的起始偏移,因为多条消息包是累加到读缓冲上的,而不是读完一条清一条
         // 上条消息处理后,读缓冲有新的数据进来
         int diff = this.byteBufferRead.position() - this.dispatchPosition;
         // 包含了一个完整的Header
         if (diff >= msgHeaderSize) {
              // 从读缓冲的指定位置读取8字节的Master传输的CommitLog片段的起始物理偏移
             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
             //获取消息体大小
             int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
		// Slave当前CommitLog的最大物理偏移
             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

             if (slavePhyOffset != 0) {
                 // 增量同步,如果Slave的CommitLog最大物理偏移和本次Master同步的CommitLog片段起始偏移不一致
                 if (slavePhyOffset != masterPhyOffset) {
                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                               + slavePhyOffset + " MASTER: " + masterPhyOffset);
                     //发生异常 关闭连接
                     return false;
                 }
             }

              // 包含了一个完整的消息头和消息体
             if (diff >= (msgHeaderSize + bodySize)) {
                 //初始化数组 用于存储消息体
                 byte[] bodyData = new byte[bodySize];
                 // 设置读索引
                 this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                 
                 //获取消息体
                 this.byteBufferRead.get(bodyData);

                 // 把Master同步过来的CommitLog添加到Slave的CommitLog
                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                 // 恢复读缓冲的写索引
                 this.byteBufferRead.position(readSocketPos);
                 
                 // 更新处理偏移
                 this.dispatchPosition += msgHeaderSize + bodySize;

                 // 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
                 if (!reportSlaveMaxOffsetPlus()) {
                     //发送失败 关闭连接
                     return false;
                 }

                 continue;
             }
         }

         // 读缓冲写满了
         if (!this.byteBufferRead.hasRemaining()) {
             //重新分配空间
             this.reallocateByteBuffer();
         }

         break;
     }

     return true;
 }

HAService$HAClient#reportSlaveMaxOffsetPlus 同步成功后向Master上报:

private boolean reportSlaveMaxOffsetPlus() {
    boolean result = true;
    long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    // 当前Slave的CommitLog的最大物理偏移 > 上次向Master上报的偏移
    if (currentPhyOffset > this.currentReportedOffset) {
        // 说明本次同步成功,更新下次向Master上报的偏移
        this.currentReportedOffset = currentPhyOffset;
        // 继续上报Master
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
            this.closeMaster();
        }
    }
    return result;
}

主从同步复制

CommitLog#putMessage 消息接收

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 消息写入CommitLog的MappedFile,略
    // 处理刷盘
    handleDiskFlush(result, putMessageResult, msg);
    // 处理高可用,如果Broker是SYNC_MASTER,则等SLAVE接收到数据后才返回,如果是ASYNC_MASTER,交给HAService线程执行同步
    handleHA(result, putMessageResult, msg);
    return putMessageResult;
}

CommitLog#handleHA同步情况下处理高可用

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    //只有设置主从同步才会执行
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        // 如果要等待存储结果是否OK
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait 
            // 存在Slave连接 && Slave和Master进度差不超过1024 * 1024 * 256 = 256MB
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                // 创建提交请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                // 加入HAService的GroupTransferService的队列
                service.putRequest(request);
                // 唤醒HAConnection的WriteSocketService线程,向Slave同步新的CommitLog
                service.getWaitNotifyObject().wakeupAll();
                PutMessageStatus replicaStatus = null;
                try {
                    // 阻塞等同步完成被HAService的GroupTransferService线程唤醒或超时
                    replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                                         TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                }
                //同步失败 通知生产者 同步slave超时
                if (replicaStatus != PutMessageStatus.PUT_OK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                              + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem 没有slave 通知生产者slave不可用
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }

}

HAService$GroupTransferService#putRequest 提交同步复制请求

 /**
         * 请求先提交到写队列,每次处理前,先交换读写队列,然后从读队列获取请求,提交后再清空读队列,避免多线程操作问题
         * CommitLog的内部类GroupCommitService也是同样的处理
         */
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
    // 获取写队列的同步锁,因为swapRequests方法会交换写队列和读队列
    synchronized (this.requestsWrite) {
        // 添加到写队列
        this.requestsWrite.add(request);
    }
    // 新请求加入时,没唤醒就唤醒GroupTransferService线程
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

HAService$GroupTransferService#run

 public void run() {
     while (!this.isStopped()) {
         try {
             // 如果唤醒标志为已唤醒(有请求加入),重置标记为未唤醒,并交换读写队列
             // 如果唤醒标记为未唤醒,则阻塞超时等待唤醒,最后重置标记为未唤醒,并交换读写队列
             this.waitForRunning(10);
             // 执行等待传输
             this.doWaitTransfer();
         } catch (Exception e) {
         }
     }
 }

HAService$GroupTransferService#doWaitTransfer:

private void doWaitTransfer() {
    // 获取读队列的同步锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            // 遍历Slave的同步请求
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                // Slave回复确认的CommitLog最大偏移 > 请求的消息在CommitLog的结束位置
                // 说明同步传输成功
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                // 重试5次或同步传输成功
                for (int i = 0; !transferOK && i < 5; i++) {
                    // 先阻塞1秒,或者HAConnection的ReadSocketService线程收到Slave的同步偏移确认时唤醒
                    this.notifyTransferObject.waitForRunning(1000);
                    // 再检查一次
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                // ...
                // 唤醒提交同步Slave请求的Broker线程,是否同步成功
                req.wakeupCustomer(transferOK);
            }
            // 处理完就清空读队列
            this.requestsRead.clear();
        }
    }
}

HAConnection$ReadSocketService#processReadEvent

private boolean processReadEvent() {
    // ...
    // 直到读完读缓冲
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 从SocketChannel读取数据到读缓冲
            int readSize = this.socketChannel.read(this.byteBufferRead);
            // 读取到了数据
            if (readSize > 0) {
                // ...
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    // ...
                    // 从读缓冲读取Slave的CommitLog同步偏移,8字节
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    // ...
                    HAConnection.this.slaveAckOffset = readOffset;
                    // ...
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                // ...
            } else {
                return false;
            }
        } catch (IOException e) {
            return false;
        }
    }
    return true;
}

HAService#notifyTransferSome

public void notifyTransferSome(final long offset) {
    // Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        // 尝试更新偏移
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            // 通知已经传输了数据
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            // 再次获取当前已向Slave传输的CommitLog最大偏移
            value = this.push2SlaveMaxOffset.get();
        }
    }
}

HAService$GroupTransferService#notifyTransferSome

  public void notifyTransferSome() {
      this.notifyTransferObject.wakeup();
  }
上一篇:深入研究Broker是如何持久化的


下一篇:RocketMQ文件存储体系介绍