RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

实现的协议:AMQP。
 
术语(Jargon)
 
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
 
RabbitMQ安装
1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3  
4 安装erlang
5    $ yum -y install erlang
6  
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server

安装rabbitmq API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

 1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ######################### 生产者 #########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 channel.basic_publish(exchange='',
13                       routing_key='hello',
14                       body='Hello World!')
15 print(" [x] Sent 'Hello World!'")
16 connection.close()
 1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ########################## 消费者 ##########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14  
15 channel.basic_consume(callback,
16                       queue='hello',
17                       no_ack=True)
18  
19 print(' [*] Waiting for messages. To exit press CTRL+C')
20 channel.start_consuming()

 

1、acknowledgment 消息不丢失(订阅端消息不丢失)

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错
 9 
10 def callback(ch,method,properties,body):
11     print("[x] Received %r" %body)#打印获得消息的内容
12     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13 
14 channel.basic_consume(callback,queue='hai',no_ack=False)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18 
19 print('[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world')
10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
11 print("[x] Sent 'hello world' ")
12 connection.close()
生产者

相关文章:

  • 2021-05-21
  • 2021-11-16
猜你喜欢
  • 2021-08-12
  • 2021-10-27
  • 2021-05-20
  • 2021-12-19
  • 2021-08-31
  • 2022-03-09
相关资源
相似解决方案