rocketMq-消息存储-commitLog

简述

    本文章针对rocketMq 开源版本4.8.0进行分析;rocketMq 流程简述如下:

rocketMq-消息存储-commitLog

nameserv:看作注册中心,broker 启动要注册到nameserv中,同时要定时向nameserv发送心跳来告诉nameserv它还活着,然后nameserv除了维护broker信息,还要维护topic的信息,比如一个topic 消息发送到哪几个broker上,然后topic 分为几个message queue等等,现在我们只需要简单知道这个nameserv 维护这broker信息与topic 信息。

broker:一个实例,存储消息队列等

producer:其发送消息分三种模式:同步发送,异步,单向发送,单向就是发送之后就不管结果了,从消息生产角度出发的;

consumer:消息消费与确认消费,消费方式通过queue_offerset  去consumeQueue查找physical_offerset,并通过physical_offerset查到对应消息内容

它们的连接方式通过netty 连接

消息存储:

broker 主节点接收消息后优先存储到commitLog,然后返回消息存储情况;每个broker 节点通过定时任务生成consumeQueue,BuildIndex ;其中consumeQueue用于消费者使用,存储的核心数据为physical_offset;BuildIndex 为消息建立关键字索引

消息存储分三大块讲解:

commitLog ----本文章讲解

consumeQueue

BuildIndex

commitLog 流程概述

    broker收到消息后交给commitLog来存储,commitLog由让干个MappedFile组成,在物理上对应为ROCKET_HOME/commitlog/00000000000000000000,每个文件默认大小是1G,文件的起名为起始offset加上每个文件的大小,比如第二个文件名为00000000001073741824,文件的读写采用内存映射技术(MMP),写采用追加写的模式,消息的offset其实就是每个消息在整个commintLog起始位置;

源码分析

 broker在启动的时候,会将某个code对应的processor注册到server上,不同类型的消息交给不同的处理器去处理,比如说SEND_MESSAGE 这个code的消息就会交给SendMessageProcessor处理,会调用对应processor的 processRequest 方法来处理,我们消息生产者发送消息的时候,就是使用的SEND_MESSAGE 这个code ,接下来我们看下SendMessageProcessor 的processRequest 方法;其中其调用链为

BrokerStartup.start() -》 BrokerController.start() -》 
NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() 
-》 new NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() 
-》 NettyRemotingAbstract.processRequestCommand() -》
SendMessageProcessor.processRequest()

 下面核心对SendMessageProcessor源码分析;

rocketMq-消息存储-commitLog

根据code 进入红线部分

rocketMq-消息存储-commitLog

过程中有一些不重点介绍,根据源码我们会走到DefaultMessageStore#putMessage

rocketMq-消息存储-commitLog

接下来我们来看putMessage 我们这次讲的是普通消息存储,就自动跳过事务代码逻辑部分

        // 获取最新的一个mappFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 获取锁,保持消息的顺序性
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
            // 重新创建一个文件
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //向mappedFile 追加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                    // 当剩余空间不能存储消息的时候,会把之前消息补全,并设置为空,返回此状态,重新创建新的mappeFile ,在追加消息
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);

代码里有我的注释,下面重点介绍消息的追加

 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
        //获取写的位置
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            //获取新的字节缓存区
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            // 重新设置新的位置
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) {
                // 追加消息
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 更新当前mappedFile 开始位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }

doAppend 内容太多,我们只看核心的部分

 rocketMq-消息存储-commitLog

rocketMq-消息存储-commitLog

 // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE  消息大小
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

 上面这段代码很重要,为消息的关键属性如下

   wroteOffset:这个就是physicsOffset很关键,以后查找消息的关键属性,定位了消息的物理位置

   消息的长度:消息内容

 queueoffset :队列的偏移量,我们也可以叫为逻辑offset 这个是根据队列id 原子增长的

由于消息内容是肯定的,我们通过physicsOffset 可以找到各个属性的值,包含消息内容;

通过physicsOffset定位消息的开始位置,就可以取到所需数据,针对消息内容各属性是变化的,都会在之前把内容的大小存储起来,这样就可以截取到完整的消息内容了;

rocketMq-消息存储-commitLog

 最后就是根据配置同步或者异步刷盘

rocketMq-消息存储-commitLog

追加成功后解锁,刷盘,根据配置是否同步刷盘,因为这个时候我们存储还在内存,避免数据消失我们可以配置同步刷盘;

总结

 commitlog是一个逻辑上的大文件,下面有很多MappedFile,每个消息都有一个physicsOffset(物理offset),后面不管是构建索引还是ConsumeQueue,存储的都是这个commitlog的offset;

有了physicsOffset 就能很快定位到具体消息开始位置,根据消息内容设置规律原则,就可以拿到消息体各个属性内容,主要有queueId,queue offset ,physicsOffset,消息内容,

上一篇:RocketMQ入门到入土(五)消息持久化存储源码解析


下一篇:RocketMQ源码分析(三)消息存储