【问题标题】:Rabbitmq pika on multi process and multi threaded architectureRabbitmq pika 关于多进程和多线程架构
【发布时间】:2021-04-21 04:55:37
【问题描述】:

我有以下情况。我有一个

  1. 基于 python 的网络服务器(发布)-(单进程和多线程)
  2. 计划的作业(发布)(单进程和多线程运行不同的作业)
  3. rabbitmq 队列中的消费者(订阅rabbitmq 主题)(单进程多线程消费不同消息)

我目前正在尝试将 rabbitmq 用于上述堆栈。因此,对于上述情况,我想为每个进程创建单个 rabbitmq 连接并使用多个通道来支持多线程任务。 Rabbitmq 文档说使用多个通道来支持多个线程很好。但是 pika 库似乎不支持这种情况。您可以参考我尝试过的以下示例

import pika
import threading
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))


def test_thread(a: int):
    channel = connection.channel()
    channel.exchange_declare(exchange='normal_ex', exchange_type='topic')
    channel.basic_publish(exchange='normal_ex', routing_key='test', body=str(a))


for i in range(0, 10):
    t = threading.Thread(target=test_thread, args=[i])
    t.start()


time.sleep(10)
connection.close()

当我运行上述跨多个线程使用多个通道的程序时,我收到以下错误

流连接丢失:AssertionError(('_AsyncTransportBase._produce() tx 缓冲区大小下溢', -21, 1))

【问题讨论】:

    标签: multithreading rabbitmq pika


    【解决方案1】:

    来自https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions

    Pika 线程安全吗?

    Pika 在代码中没有任何线程概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个在该线程中创建的 Pika 连接。跨线程共享一个 Pika 连接是不安全的,但有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接中安排回调。

    【讨论】:

      猜你喜欢
      • 2016-09-30
      • 1970-01-01
      • 2019-06-20
      • 1970-01-01
      • 1970-01-01
      • 2011-03-09
      • 2016-07-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多