内容目录:
- rabbitMQ
- python操作mysql,pymysql模块
- Python ORM框架,SQLAchemy模块
- Paramiko
- 其他with上下文切换
rabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
1、基本安装配置使用
安装
ubuntu系统上安装rabbitmq sudo apt install erlang sudo apt install rabbitmq-server #拷贝配置文件,否则连接rabbitmq会报错 cp /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz /etc/rabbitmq/ cd /etc/rabbitmq/ gunzip rabbitmq.config.example.gz /etc/init.d/rabbitmq-server start #启动 centos上 安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server service rabbitmq-server start/stop python操作的客户端需要安装pika模块来操作rabbitmq python3 -m pip install pika
使用
基于queue方式实现的消费者和生产者模型
import queue
import threading
import time
message = queue.Queue(10)
def producer(i):#生产者方法
while True:
message.put('生产--'+str(i)) #添加到队列
time.sleep(1)
def consumer(i):#消费者方法
msg = message.get() #从队列中取消息
print(msg)
for i in range(3):#创建2个任务线程来生产
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10):#创建5个消费者任务来消费
t = threading.Thread(target=consumer, args=(i,))
t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.139.137')) #创建连接
channel = connection.channel() #创建频道
channel.queue_declare(queue='hello') #定义队列名,和订阅者的一致
channel.basic_publish(exchange='',routing_key='hello',
body='Hello World!')#发布消息
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.139.137')) #建立连接
channel = connection.channel()#创建频道
channel.queue_declare\
(queue='hello') #定义一个rabbitMQ下的一个队列名,与发布者一致
def callback(ch, method, properties, body):#回调函数
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)#指定队列名核函数准备执行接收消息
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#开始接收消息
2、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
我们在操作中只需在订阅者上添加no-ack=False设置就行,生产者不变
消费者代码:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.139.137')) #建立连接
channel = connection.channel()#创建频道
channel.queue_declare\
(queue='hello') #定义一个rabbitMQ下的一个队列名
def callback(ch, method, properties, body):#回调函数
print(" [x] Received %r" % body)
time.sleep(3)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)#此处设置no_ack=False
# 表示中途断开rabbitmq会将将改任务添加到队列中
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#开始接收消息
3、durable 消息不丢失
生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.139.137')) #创建连接
channel = connection.channel() #创建频道
channel.queue_declare(queue='hello',
durable=True) #定义队列名,和接收者的一致
channel.basic_publish(exchange='',routing_key='hello',
properties=pika.BasicProperties
(delivery_mode=2),#make message persistent
body='Hello World!') #发布消息
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.139.137')) #建立连接
channel = connection.channel()#创建频道
channel.queue_declare(queue='hello',
durable=True) #定义一个rabbitMQ下的一个队列名
def callback(ch, method, properties, body):#回调函数
print(" [x] Received %r" % body)
time.sleep(3)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)#此处设置no_ack=False
# 表示中途断开rabbitmq会将将改任务添加到队列中
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#开始接收消息
4、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
订阅者代码和发布者均取消durable=True部分,测试时候只要开启多个订阅者执行就能看到第一个订阅者收第一个消息,第二个订阅者接收第二条消息。。。
消费者
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.139.137')) #建立连接 channel = connection.channel()#创建频道 #channel.queue_declare(queue='hello',durable=True) #定义一个rabbitMQ下的一个队列名 channel.queue_declare(queue='hello') #定义一个rabbitMQ下的一个队列名 def callback(ch, method, properties, body):#回调函数 print(" [x] Received %r" % body) time.sleep(3) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,queue='hello',no_ack=False)#此处设置no_ack=False 表示中途断开rabbitmq会将将改任务添加到队列中 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()#开始接收消息