RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。
实现的协议:AMQP。
术语(Jargon)
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
RabbitMQ安装
1 安装配置epel源 2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 3 4 安装erlang 5 $ yum -y install erlang 6 7 安装RabbitMQ 8 $ yum -y install rabbitmq-server
安装rabbitmq API
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
1 #!/usr/bin/env python 2 import pika 3 4 # ######################### 生产者 ######################### 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 channel.basic_publish(exchange='', 13 routing_key='hello', 14 body='Hello World!') 15 print(" [x] Sent 'Hello World!'") 16 connection.close()
1 #!/usr/bin/env python 2 import pika 3 4 # ########################## 消费者 ########################## 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 15 channel.basic_consume(callback, 16 queue='hello', 17 no_ack=True) 18 19 print(' [*] Waiting for messages. To exit press CTRL+C') 20 channel.start_consuming()
1、acknowledgment 消息不丢失(订阅端消息不丢失)
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错 9 10 def callback(ch,method,properties,body): 11 print("[x] Received %r" %body)#打印获得消息的内容 12 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 13 14 channel.basic_consume(callback,queue='hai',no_ack=False) 15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息, 16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认, 17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 18 19 print('[*]Waiting for messages to exit press CTRL+C') 20 channel.start_consuming()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world') 10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容 11 print("[x] Sent 'hello world' ") 12 connection.close()