ActiveMQ(九):消息存储与可持久化

1.简介

1.1此处持久化和之前的持久化的区别

ActiveMQ(九):消息存储与可持久化
MQ高可用:事务、可持久、签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。

1.2持久化是什么?

持久化是什么?一句话就是:ActiveMQ宕机了,消息不会丢失的机制。
说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

2.持久化的方式

2.1AMQ Message Store

基于文件的存储机制,是以前的默认机制,现在不再使用。
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

2.2kahaDB

基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。
官网文档:http://activemq.aache.org/kahadb,官网上还有一些其他配置参数。
配置文件activemq.xml中,如下

   <persistenceAdapter>
         <kahaDB directory="${activemq.data}/kahadb"/>
   </persistenceAdapter>

日志文件的存储目录在:%activemq安装目录%/data/kahadb
ActiveMQ(九):消息存储与可持久化
ActiveMQ(九):消息存储与可持久化
ActiveMQ(九):消息存储与可持久化

2.3JDBC消息存储

2.3.1原理图

ActiveMQ(九):消息存储与可持久化

2.3.2添加mysql数据库的驱动包到lib文件夹

2.3.3 jdbcPersistenceAdapter配置

ActiveMQ(九):消息存储与可持久化
修改配置文件activemq.xml。将之前的替换为jdbc的配置。如下:

<!--  
<persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
      </persistenceAdapter>
-->
<persistenceAdapter>  
      <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/> 
</persistenceAdapter>

2.3.4 数据库连接池配置

需要我们准备一个mysql数据库,并创建一个名为activemq的数据库。在标签和标签之间插入数据库连接池配置。
ActiveMQ(九):消息存储与可持久化
之后需要建一个数据库,名为activemq。新建的数据库要采用latin1 或者ASCII编码
默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。

2.3.5建库SQL和创表说明

重启activemq。会自动生成如下3张表。如果没有自动生成,需要我们手动执行SQL。如果实在不行,下面是手动建表的SQL:

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment '消息的Destination',
    SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
    CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
    SUB_NAME      varchar(250)     not null comment '订阅者名称',
    SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
    LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
    PRIORITY      bigint default 5 not null comment '优先级,默认5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
    comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';

create index ACTIVEMQ_ACKS_XIDX
    on ACTIVEMQ_ACKS (XID);

 
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
        primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);

 
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
        primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
    on ACTIVEMQ_MSGS (CONTAINER);

create index ACTIVEMQ_MSGS_EIDX
    on ACTIVEMQ_MSGS (EXPIRATION);

create index ACTIVEMQ_MSGS_MIDX
    on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);

create index ACTIVEMQ_MSGS_PIDX
    on ACTIVEMQ_MSGS (PRIORITY);

create index ACTIVEMQ_MSGS_XIDX
    on ACTIVEMQ_MSGS (XID);

ACTIVEMQ_MSGS数据表:
ActiveMQ(九):消息存储与可持久化
ACTIVEMQ_ACKS数据表:
ActiveMQ(九):消息存储与可持久化

ACTIVEMQ_LOCK数据表:
ActiveMQ(九):消息存储与可持久化

2.3.6总结

如果是queue:在没有消费者消费的情况下会将消息保存到activemq_msgs表中,消费之后这些消息会被立刻删除。
如果是topic:一般是先启动消费者订阅然后在生产的情况下将消息保存到activemq_acks。

  • 注意点1:要将使用到的jar文件房知道ActiveMQ安装目录下的lib中。包括mysql_jdbc和对应的连接池的jar包
  • 注意点2:在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时候,第一次启动ActiveMQ时,将会自动创建数据表,但是启动之后可以更改为false
  • 下划线坑爹:java.lang.IllageStateException:BeanFactory not initialized or aleady closed:这是因为您的操作机器中有_符号,更改机器名后重启就行。

2.4 LevelDB消息存储

过于新兴的技术,现在有些不确定。这种文件系统是ActiveMQ5.8之后引进的,他和KhaDB非常近似,也是基于文件的本地数据库的存储形式,但是他比KahaDB更快的持久性。他不使用自定义的B-Tree实现索引预写日志,而是使用levelDB索引。

<persistenceAdapter>
<levelDBdirectory="acivemq-data"/>
</persistenceAdapter>

2.5JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。

下面是基于上面JDBC配置,再做一点修改:ActiveMQ(九):消息存储与可持久化
ActiveMQ(九):消息存储与可持久化

3.总结

① jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
② 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
③ 持久化机制演变的过程:
从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。

上一篇:Odometry的发布和发布odom到base_link的tf变换


下一篇:PHP数据库备份与恢复