RocketMQ-消息存储

1.存储结构

RocketMQ-消息存储

  • commitLog
    消息存储文件,所有主题的消息都存储在commitLog文件中
  • consumeQueue
    每个MessageQueue对应一个ConsumeQueue文件,存储的是该队列的所有消息数据,但是不是存储的全量数据,只是存储了该消息在commitLog里的offset。相当于索引文件。消息到达commitLog后将异步发送到consumeQueue供消费者消费
  • indexFile
    存储了msgId key等键,对应commitLog的offset,可以快速根据msgId,msgKey等查询消息
  • 事务状态服务
    存储每条消息的事务状态
  • 定时消息服务
    每个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度

2.存储流程

  • broker-processor-SendMessageProcessor
    broker处理消息的流程都在processor包里,其中处理消息生产者发送的消息的类是 SendMessageProcessor;处理方法为processRequest();进入到asyncSendMessage()方法,asyncPutMessage()方法
 public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
        }

        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
        }

        long beginTime = this.getSystemClock().now();
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

        putResultFuture.thenAccept((result) -> {
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) {
                log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
        });

        return putResultFuture;
    }
  1. checkStoreStatus();检查MessageStore的状态,是否可用;如果Broker停止工作,或者该broker为slave则不支持写入,以及printTimes为5万次,也不能写入
  2. checkMessage();检查消息,如果主题长度超过127,则拒绝写入;如果properties超过32767,也拒绝写入
  3. 往commitLog里面写入数据
commitLog.asyncPutMessage(final MessageExtBrokerInner msg){
// 省略部分代码
 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
   try {
       long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
       this.beginTimeInLock = beginLockTimestamp;

       // Here settings are stored timestamp, in order to ensure an orderly
       // global
       msg.setStoreTimestamp(beginLockTimestamp);

       if (null == mappedFile || mappedFile.isFull()) {
           mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
       }
       if (null == mappedFile) {
           log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
           beginTimeInLock = 0;
           return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
       }
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
}

设置消息存储的时间,如果MappedFile为空,表示commitLog下面不存在任何文件,本次消息时第一次发送,用偏移量作为文件名,不足20位补0,如果创建失败,有可能是磁盘空间不足,

  • appendMessage()
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

首先获取MappedFile的当前写指针,如果当前指针大于或等于文件大小,则表示文件已经写满了;抛出未知错误异常。如果小于文件大小,通过slice()方法创建一个MappedFile的共享内存区,并且设置position(指针)
创建byteBuffer

  • doAppend()
 long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

创建全局唯一消息ID,16字节组成
4 IP + 4 PORT + 8 偏移量
为了可读性,msgId会转成字符串;后续可以再转成字节数组,根据偏移量查找消息

  • calMsgLength
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
        final int msgLen = 4 //TOTALSIZE  该消息条目总长度 字节
            + 4 //MAGICCODE  魔数, 字节 固定 daa320a7
            + 4 //BODYCRC  消息体 校验码, 字节
            + 4 //QUEUEID  消息消费队列 ID
            + 4 //FLAG
            + 8 //QUEUEOFFSET  消息在消息消费队列的偏移量
            + 8 //PHYSICALOFFSET  消息在 CommitLog 件中的偏移量
            + 4 //SYSFLAG  消息系统 Flag ,例如是否压缩,是否是事务消息
            + 8 //BORNTIMESTAMP 消息生产者调用消息发送API 的时间戳
            + bornhostLength //BORNHOST  消息发送者 IP端口号
            + 8 //STORETIMESTAMP  消息存储时间
            + storehostAddressLength //STOREHOSTADDRESS broker服务器Ip + 端口
            + 4 //RECONSUMETIMES 消息重试次数
            + 8 //Prepared Transaction Offset  事务消息偏移量
            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY  消息体长度
            + 1 + topicLength //TOPIC  主题长度
            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength 消息属性长度
            + 0;
        return msgLen;
    }

根据消息体长度等结合消息存储格式计算消息的总长度
如果消息长度大于commitLog文件剩余长度,则需要新创建一个文件来进行存储

// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中,并没有刷到磁盘

3. 存储文件组织与内存映射

RocketMQ-消息存储

  • MappedFileQueue
    管理所有的MappedFile,是存储目录的封装
    1 ) String storePath :存储目
    2 ) int mappedFileSize 个文件的存储大小
    3 ) CopyOnWriteArrayList mappedFiles: MappedFile 文件集合
    4 ) AllocateMappedFileService allocateMappedFileService :创 MappedFile 服务类
    5 ) long flushedWhere = 0 前刷 指针, 表示该指针之前的所有数据 部持久化到磁盘
    6 ) long committedWhere = 0 当前数据提交指针,内存中 ByteBuffer 当前的写指针,
    该值大于等于 flushedWhere

  • MappedFile
    1 ) int OS PAGE SIZE :操作系统每页大小,默认 4k
    2 ) AtomicLong TOTAL_MAPPED _ VIRTUAL_MEMORY JVM 例中 MappedFile 虚拟内存
    3 ) Atomiclnteger TOTAL_MAPPED_FILES :当前 JVM 实例中 MappedFile 对象个数
    4 ) Atomiclnteger wrotePosition 该文件的写指针,从0开始(内存映射文件中的写指针)
    5 ) Atomiclnteger committedPosition :当前文件的提交指针,如果开启 transientStore
    PoolEnable 则数据会存储在 TransientStorePool 中, 然后提交到内存映射 ByteBuffer 中,再刷到磁盘中
    6 ) Atomiclnteger flushedPosition :刷写到磁盘指针,该指针之前的数据持久化到磁盘中
    7 ) int fileSize :文件大小
    8 ) FileChannel fileChannel 文件通道
    9 ) ByteBuffer writeBuffer :堆内存 ByteBuffer 如果不为空,数 先将存储在
    Buffer 中, 然后提交到 appedFile 对应的内存映射 Buffer transientStorePoolEnable
    为true时不为空
    10 ) TransientStorePool transientStorePool :堆内存池, transientStor PoolEnable true
    时启用
    11 ) String fileName :文件名称
    12 ) long fileFromOffset :该文件的初始偏移量
    13 ) File file :物理文件
    14 ) MappedByteBuffer mappedByteBuffer :物理文件对应的内存映射 Buffer
    15 ) volatile long storeTimestamp = :文件最后一次内容写入时间
    16 ) boolean firstCreatelnQueue :是否是 MappedFileQueue 队列中第一个文件

  • commitLog服务有一个线程在跑,里面执行了mappedFile的commit方法

@Override
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

                long begin = System.currentTimeMillis();
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }

                try {
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }
  • commit方法(将MappedFile中writeBuffer中的数据提交到文件通道FileChannel中)
    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }

执行提交操作,commitLeastPages为本次提交最小页数,如果待提交数据不满commitLeastPages,则不执行本次操作,待下次提交。writeBuffer如果为空,则直接返回wrotePosition指针,无需执行commit操作,表明commit操作的主体是wirterBuffer

  • 提交完之后有个唤醒flushCommitLogService的操作
 public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }
  • 刷写磁盘
    直接调 mappedByteBuffer fileChannel force() 方法将内存中数据持久化到磁盘,那么 flushedPosition 应该等于 MappedByteBuffer 中的写指针;如 writeBuffer 不为空, flushedPosition 应等于上一次 commit 指针;因为上一次提交的数据就是进入到 MappedByteBuffer 中的数据;如 writeBuffer 空,数据是直接进入到 appedByteBuffer, wrotePosition 代表的是 MappedByteBuffer 中的指针,故设置 flushedPosition为wrotePosition
  • MappedFile文件销毁
    默认情况下,消息会存放48小时,过期就会被删除

4. RocketMQ的存储文件

1 ) commitlog :消息存储 目录
2 ) config :运行期间一些配置信息,主要包括下列信息
consumerFilter.json 主题消息过滤信息
consumerOffset.json 集群消费模式消息消 进度
delayOffset.json :延时消息队列拉取进
subscri ptionGroup .j son 消息消费组配置信息
topics.json: topic配置属性
3 ) consumequ ue :消息消 队列存储目
4 ) index :消息索引文件存储目录。
5 ) abort :如果存在 abort 文件说明 Broker 非正常关闭,该文件在broker启动时创建,正常
退出之前删除
6 ) checkpoint :文件检测点,存储 commitlog 文件 一次刷盘时间戳、 consumequeue
最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。

4.1 实时更新ConsumerQueue和indexFile文件

Broker文件在启动的时候,会启动一个ReputMessageService线程,并且初始化一个非常重要的参数reputFromOffset,该参数的含义是线程从哪个偏移量开始转发消息给ConsumeQueue和IndexFile。如果允许重发转发,reputFromOffset设置为commitLog的提交指针;如果不允许,则设置为commitLog内存中的最大偏移量

      public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }
  • doReput()
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

1.返回从reputFromOffset偏移量开始的全部有效数据

if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
						// 循环读取数据
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                           // 校验并且返回size
                            DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            //构建DispatchRequest对象,如果消息长度大于0,则调用doDispatch()
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

2.最终实现是CommitLogDispatcherBuildConsumerQueue和CommitLogDispatcherBuildIndex

  • consumerQueue的实现
  class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

putMessagePositionInfo()
依次将消息的偏移量,长度,tagHashCode写入到byteBuffer中,并根据consumerQueueOffset计算ConsumeQueue中的物理地址,将内容追加到内存映射文件中(映射为MappedFile文件),并不进行刷盘操作

  • IndexFile
    如果messageIndexEnable设置为true,则调用IndexService的buildIndex构建hash索引,否则忽略本次转发任务
    1.获取或创建IndexFile文件并获取所有文件的最大物理偏移量。如果该消息的物理偏移量小于索引文件中的物理偏移,则说明是重复数据
    2.如果消息的唯一键不为空,则添加到Hash索引中,一遍快速检索消息
    3.构建索引键,支持一条消息多个索引,多个索引间空格隔开

4.2消息队列与索引文件恢复

Broker是将消息全量存在commitLog文件中,同时异步开启一个线程去转发消息到consumerQueue和IndexFile中,如果转发任务未成功执行,此时Broker宕机,则会导致commitLog,consumerQueue,IndexFile中的文件不一致。会有一部分消息在commitLog中存在,但是不在consumerQueue中,永远不会被消费到。

  • DefaultMessageStore - load();
public boolean load() {
        boolean result = true;

        try {
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
			// 加载延时消息
            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // load Commit Log
            result = result && this.commitLog.load();

            // load Consume Queue
            result = result && this.loadConsumeQueue();

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

                this.indexService.load(lastExitOK);

                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;
    }

判断上次退出是否正常。实现机制是Broker启动时创建abort文件,退出时勾子函数删除abort文件。如果下一次启动时存在abort文件,则表示是异常退出,需要修复文件

  • MappedFiledQueue -load();
    public boolean load() {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

1.加载commitLog文件,并且按照文件名排序。如果文件大小与配置文件的单个配置文件大小不一致,将忽略下面所有文件,重新创建MappedFile
2.加载消息消费队列,与上面类似,构建ConsumerQueue
3.加载存储检测点,检测三个文件的刷盘点,在下次刷盘中再次提交

4.2.1Broker正常关闭恢复

  • CommitLog.recoverNormally()
    从倒数第三个文件开始进行恢复,如果不足三个,则从第一个开始恢复,遍历CommitLog文件,每次取出一条消息,如果查找结果为true并且消息的长度大于0表示正确

4.2.2Broker异常关闭恢复

  • CommitLog.recoverAbnormally()
    从最后一个文件往前走,找到第一个正常存储的文件。

4.3刷盘机制

RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制( MappedByteBuffer )的,消
息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。如果
是同步刷盘,消息追加到内存后,将同步调用 MappedByteBufferforce()方法;如果是异
步刷盘,在消息追加到内存后立刻返回给消息发送端 RocketMQ 使用一个单独的线程按照某种设定的频率执行刷盘操作;commitLog,ConsumerQueue,IndexFile的刷盘原理都类似,但是IndexFile是在每次收到消息更新时进行的

4.3.1 同步刷盘

消息追加到内存映射文件的内存后,立刻将数据从内存刷到磁盘文件中,由CommitLog的handleDiskFlush实现

   if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    //flushOK=false;
                }
                if (flushStatus != PutMessageStatus.PUT_OK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }

1.构建GroupCommitService同步任务并提交到GroupCommitRequest
2.等待同步刷盘任务完成,如果超时(5s)则返回刷盘错误,刷盘成功则正常返回给调用方

  • GroupCommitService线程处理
    阻塞等待刷盘结果
    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

处理完所有刷盘任务后,更新刷盘检测点storeCheckPoint中的physicMsgTimestamp,但是并没有执行监测点的刷盘操作,该操作在刷写消息队列文件时触发
总结:消息生产者在消息服务端将内容追加到内存映射文件(内存)后,需要同步将内存的内容立刻刷写到磁盘,通过调用内存映射文件的force方法可将内存中的数据写入磁盘

4.3.2 异步刷盘

先把消息追加到内存中,然后开启一个刷盘线程定时将数据刷到磁盘中

4.4 过期文件删除机制

如果当前写文件在一定时间内未更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否被全部消费,默认每个文件的过期时间是72小时,通过Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

 private void addScheduleTask() {

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

每10s会调度一次cleanFilesPeriodcally,检查是否需要清除过期文件,执行频率可以通过设置cleanResourceInterval,默认为10s
该方法分别清除commitLog 和 consumeQueue,两种文件公用一套文件删除机制

  • CleanCommitLogService - deleteExpireFiles()
       // 文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则是过期文件
       long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
       // 删除物理文件的时间间隔,因为在一次删除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔
       int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
       // 清除文件时,如果该文件被其他线程占用,此时会影响删除任务,同时在第一次试图删除该文件时记录当前时间戳;该值表示第一次拒绝删除后能保留的最大时间
       int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
		// 满足以下三个条件之一,即可进行删除
		boolean timeup = this.isTimeToDelete(); //到时间删除,默认凌晨4点,可配置;broker.conf
        boolean spacefull = this.isSpaceToDelete(); // 磁盘空间不足
        boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 预留,手动触发,调用execueDeleteFilesManually,目前未封装
           if (timeup || spacefull || manualDelete) {

                if (manualDelete)
                    this.manualDeleteFileSeveralTimes--;

                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                    fileReservedTime,
                    timeup,
                    spacefull,
                    manualDeleteFileSeveralTimes,
                    cleanAtOnce);

                fileReservedTime *= 60 * 60 * 1000;

                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
  • isSpaceToDelete
       private boolean isSpaceToDelete() {
       // commitLog consumerQueue在磁盘中的最大使用量
            double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

            cleanImmediately = false;

            {
                String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
                // commitLog 磁盘使用率
                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                // 如果使用率超过0.9,表示快满了,设置为不可写
                if (physicRatio > diskSpaceWarningLevelRatio) {
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                    if (diskok) {
                        DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
                    }

                    cleanImmediately = true;
                    // 到了0.85就建议清除,但是不会拒绝写操作
                } else if (physicRatio > diskSpaceCleanForciblyRatio) {
                    cleanImmediately = true;
                } else {
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                    if (!diskok) {
                        DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
                    }
                }

                if (physicRatio < 0 || physicRatio > ratio) {
                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
                    return true;
                }
            }
  • 删除操作
 public int deleteExpiredFile(
        final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately
    ) {
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }

// ------------------------------------------------执行文件销毁与删除
// 如果满足72小时删除或者磁盘空间条件删除,则进行删除MappedFile的destory方法进行删除
     for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;

                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }

                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
上一篇:RocketMQ源码解析之broker文件清理


下一篇:FZU 第十五届程序设计竞赛_重现赛 & FOJ Problem 2289 项链