Rabbitmq的使用以及其他RPC的实现

消息队列

1、两个服务调用(生产者消费者)

restful(http协议),rpc(远程过程调用)

2、解决的问题

  • 应用解耦

  • 流量削峰

  • 消息分发(发布订阅:观察者模式)

  • 异步消息(celery就是封装的消息队列)

常见消息队列及比较

Rabbitmq的使用以及其他RPC的实现

Rabbitmq和kafka的比较(主流)

rabbitmq:

吞吐量小,有消息确认机制,消费完了,告诉我才会删掉,涉及订单电商等对可靠性有要求

所谓吞入量小也是可以几百万条

kafka:

吞吐量高,注重高吞吐量,不注重消息可靠性,但这并不代表着不可靠,可能只是几亿条数据才会出现一两条,只是拿到就删掉

适用场景—日志—存在大量日志,一般是先存放到消息队列中,然后再持久化到硬盘中,而且一般不会立马存储到硬盘中而是屯个一两天再存

结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka

RPC框架

gRPC 可以跨语言

rpc就是两台不同机器间方法的调用,客户端1想调用服务端上的方法,于是便通过网络发送请求

,然后服务端执行方法,通过网络返回结果

Rabbitmq安装

一、原生安装

# 安装扩展源epel
# yum install epel-release -y
yum -y install erlang
yum -y install rabbitmq-server
systemctl start rabbitmq-server


rpm -qa rabbitmq-server

Rabbitmq的使用以及其他RPC的实现

二、docker拉取

# 这样拉取直接开启了web管理界面
docker pull rabbitmq:management

docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:management

-e, --env=[]               指定环境变量,容器中可以使用该环境变量 
第一个-e 设置用户名
第二个-e 设置密码
# 关于为什么映射两个端口?
# 生产消费
5672是默认的端口
15672是web管理界面的端口

Rabbitmq的使用以及其他RPC的实现

创建用户名和密码

# 创建用户
rabbitmqctl add_user lqz 123
# 分配权限
rabbitmqctl set_user_tags lqz administrator
rabbitmqctl set_permissions -p "/" lqz ".*" ".*" ".*"

简单的生产者消费者模型

RabbiMq官网解析

一、生产者

import pika


# 用docker拉起需要用户名密码
credentials = pika.PlainCredentials(username="admin",password="123456")
# 建立连接,拿到连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='139.9.67.5', port=5673,credentials=credentials))


# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue="hello")  # 指定队列名字

# 生产者向队列中放一条消息
channel.basic_publish(exchange="",
                      routing_key="hello",  # 队列名字
                      body=b"Hello World!")

print("Sent Hello World!")

connection.close()

存放多个以后可以在客户端查看

Rabbitmq的使用以及其他RPC的实现

二、消费者

从队列接收消息更为复杂。它的工作方式是将回调函数订阅到队列。每当我们收到消息时,此回调函数都会由 Pika 库调用。在我们的情况下,此功能将在屏幕上打印消息的内容。

# 消费者


import pika, sys, os


def main():
    credentials = pika.PlainCredentials("admin", "123456")
    connection = pika.BlockingConnection(pika.ConnectionParameters('139.9.67.5', 5673, credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    # auto_ack=True消息队列收到确认就会把消费过的消息删除,但是一般不这样使用,接收到就确认,消费成功与否未知
    # channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    # 如果是false不会自动确认消息
    channel.basic_consume(queue='First', on_message_callback=callback, auto_ack=False)
    
    channel.start_consuming()


if __name__ == '__main__':
    main()

一般使用auto_ack是在回调函数中的,这样是确认被处理完了在返回确认消息,没有确认消息不会丢

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
            ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)

Rabbitmq的使用以及其他RPC的实现

三、持久化

在创建队列时指定参数等于True durable=True,对已存在的队列无效,生产消费都需要,但这样只是队列持久化

# 生产者
import pika

# 用docker拉起需要用户名密码
credentials = pika.PlainCredentials(username="admin", password="123456")
# 建立连接,拿到连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='139.9.67.5', port=5673, credentials=credentials))

# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue="Two", durable=True)  # 指定队列名字

# 生产者向队列中放一条消息
for i in range(3):
    channel.basic_publish(exchange="",
                          routing_key="Two",  # 队列名字
                          body=b"Two",
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))

print("Sent Two")

connection.close()
# 消费者

channel.queue_declare(queue='Two',durable=True)

四、闲置消费

正常情况下是分批次消费,也就是按顺序消费三个消费者

Rabbitmq的使用以及其他RPC的实现

均匀分配

Rabbitmq的使用以及其他RPC的实现

但是这样有个问题,有些任务中,处理起来慢,消费者1积压了很多任务,但是消费者50却没有任务了,如果再按照原来的分配效率就会变慢,那么就应该让空闲的来工作

配置闲置消费也容易,只需要在消费者中配置一行就可以了

channel.basic_qos(prefetch_count=1)

### 消费者1
import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='lqz')

    def callback(ch, method, properties, body):
        import time
        time.sleep(50)
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    ## 不会自动回复确认消息,
    ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_qos(prefetch_count=1)  #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
    channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()

if __name__ == '__main__':

    main()

五、发布订阅

多个订阅者订阅你的消息,这样发布者只需要发布一次消息,所有只要订阅的人都可以收到消息,也就是多个消费者监听不同的队列,生产者利用exchange将消息复制到监听的队列

Rabbitmq的使用以及其他RPC的实现

5.1基本使用

## 发布者
import pika


credentials = pika.PlainCredentials("admin", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('139.9.67.5', 5673,credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=b'message')
print(" [x] Sent %r" % message)
connection.close()

订阅者启动多次会绑定不同的队列,但是都绑定到exchange上

## 订阅者(启动多次,会创建出多个队列,都绑定到了同一个exchange上)
import pika

credentials = pika.PlainCredentials("admin", "123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('139.9.67.5', 5673,credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
# 名字自启
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

5.2 关键字监听(监听多个)

主要就是将exchange_type修改为'direct',发布者和监听者都要修改,监听者监听多个

### 发布者
import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='lqz123', exchange_type='direct')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='lqz123', routing_key='bnb', body=message)
print(" [x] Sent %r" % message)
connection.close()


### 订阅者1
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='lqz123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


####订阅者2
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='lqz123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 配置项越多,监听的越多
channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='bnb')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

5.3 模糊匹配

模糊匹配router_key监听的队列

当我们监听的关键字出现routing_key='qqq.dd'这种我们采取的办法可以直接配置,也可以采取模糊匹配,这个时候#*都能匹配到,但是如果出现routing_key='qqq.dd.xxx'这个时候就只有#能匹配到了

#表示后面可以跟任意字符

*表示后面只能跟一个单词


###发布者
import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='m3', exchange_type='topic')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='m3', routing_key='lqz.dd', body=message)
print(" [x] Sent %r" % message)
connection.close()


### 订阅者1 
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.*')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


###订阅者2 
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.#')
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

六、通过rabbitmq实现RPC

客户端通过rabbitmq调用,那么服务端就要一直运行,现在要调用服务端函数,那么客户端端就要把调用的函数和唯一uuid放在队列里,服务端就回到队列里取出调用的函数去执行,执行完成后再放回到队列里,然后客户端再去队列中拿到执行好的结果,然后外界看上去就是调用客户端自己的函数

服务端

#!/usr/bin/env python
import pika

# 建立连接
credentials = pika.PlainCredentials("admin", "123456")
connection = pika.BlockingConnection(
    pika.ConnectionParameters('111', 5673, credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)
    
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客户端

## 客户端

import pika
import uuid


class FibonacciRpcClient(object):
    
    def __init__(self):
        # 建立连接
        self.credentials = pika.PlainCredentials("admin", "123456")
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('111', 5673, credentials=self.credentials))
        self.channel = self.connection.channel()
        # 声明了一个接受返回值的队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        # 进行消费
        self.channel.basic_consume(
            queue=self.callback_queue,
            # 数据拿回来后,调用函数--on_response
            on_message_callback=self.on_response,
            auto_ack=True)
    
    def on_response(self, ch, method, props, body):
        # 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果
        if self.corr_id == props.correlation_id:
            # 重新赋值,终止死循环
            self.response = body
    
    # 调用远程方法
    def call(self, n):
        self.response = None
        # 生成一个uuid 作为唯一标识作为区分
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            # 消息发送队列
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                # 等到数据回来调用callback
                reply_to=self.callback_queue,
                # 指定id
                correlation_id=self.corr_id,
            ),
            body=str(n))
        # 一开始是none,等待响应完成收到返回的id也就是 on_response 有值停止
        while self.response is None:
            # 非阻塞版的start_consuming()
            self.connection.process_data_events()
        return int(self.response)


# 实例化对象
fibonacci_rpc = FibonacciRpcClient()

# 对象调用call方法,并传入参数
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)  # 外界看上去,就像调用本地的call()函数一样
print(" [.] Got %r" % response)

七、其他方式实现RPC

自带的:SimpleXMLRPCServer(数据包大,速度慢)

第三方:ZeroRPC(底层使用ZeroMQ和MessagePack,速度快,响应时间短,并发高),grpc(谷歌推出支持夸语言)

7.1SimpleXML实现RPC

服务端

# xml是python自带的框架 借助于xml 数据量大比较慢

### 服务端
from xmlrpc.server import SimpleXMLRPCServer


class RPCServer(object):
    
    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:' + str(i): i for i in range(100)}
        self.recv_data = None
    
    # 获取数据
    def getObj(self):
        print('get data')
        return self.send_data
    # 发送数据
    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)


# SimpleXMLRPCServer
# 监听地址方便别人访问
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()

# 需要传入类对象,如果写的是函数就把函数的内存地址传过去
server.register_instance(RPCServer())

# 服务端需要一会服务等待调用
server.serve_forever()

客户端

### 客户端
import time
from xmlrpc.client import ServerProxy


# SimpleXMLRPCServer
def xmlrpc_client():
    print('xmlrpc client')
    # 拿到监听的对象
    c = ServerProxy('http://localhost:4242')
    data = {'client:' + str(i): i for i in range(100)}
    start = time.time()
    for i in range(50):
        # 直接调用监听对象中的类方法
        a = c.getObj()
        print(a)
    for i in range(50):
        c.sendObj(data)
    print('xmlrpc total time %s' % (time.time() - start))


if __name__ == '__main__':
    xmlrpc_client()

Rabbitmq的使用以及其他RPC的实现

7.2 ZeroRPC

服务端

### 服务端
import zerorpc


class RPCServer(object):
    
    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:' + str(i): i for i in range(100)}
        self.recv_data = None
    
    def getObj(self):
        print('get data')
        return self.send_data
    
    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)


# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()

客户端

# zerorpc
def zerorpc_client():
    print('zerorpc client')
    c = zerorpc.Client()
    c.connect('tcp://127.0.0.1:4243')
    data = {'client:' + str(i): i for i in range(100)}
    start = time.time()
    for i in range(500):
        a = c.getObj()
        print(a)
    for i in range(500):
        c.sendObj(data)
    
    print('total time %s' % (time.time() - start))


if __name__ == '__main__':
    zerorpc_client()

Rabbitmq的使用以及其他RPC的实现

还有一个是传输格式是json,但是比grpc要慢一点,用为GRPC包装数据时使用的的protobuf,数据更小

上一篇:消息队列Rabbitmq


下一篇:RabbitMQ