【发布时间】:2023-11-03 03:38:01
【问题描述】:
我正在寻找一种从我的 django 应用程序向 rabbitmq 服务器发布消息的方法。这不是用于任务卸载,所以我不想使用 Celery。目的是使用 django 应用程序发布到交易所,并在 docker 容器中使用该队列中的姐妹(非 django)应用程序。
这一切看起来都非常简单,但是,我似乎无法在每次不建立和关闭连接的情况下发布到交易所,即使没有明确要求这样做。
为了解决这个问题,我定义了一个带有嵌套单例类的类,该类使用 Pika 维护与 rabbitmq 服务器的连接。这个想法是嵌套的单例只会被实例化一次,并在那时声明连接。任何时候要将某些内容发布到队列中,单例都会处理它。
import logging
import pika
import os
logger = logging.getLogger('django')
class PikaChannelSingleton:
class __Singleton:
channel = pika.adapters.blocking_connection.BlockingChannel
def __init__(self):
self.initialize_connection()
def initialize_connection(self):
logger.info('Attempting to establish RabbitMQ connection')
credentials = pika.PlainCredentials(rmq_username, rmq_password)
parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
connection = pika.BlockingConnection(parameters)
con_chan = connection.channel()
con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
self.channel = con_chan
def send(self, routing_key, message):
if self.channel.is_closed:
PikaChannelSingleton.instance.initialize_connection()
self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
body=message)
instance = None
def __init__(self, *args, **kwargs):
if not PikaChannelSingleton.instance:
logger.info('Creating channel singleton')
PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()
@staticmethod
def send(routing_key, message):
PikaChannelSingleton.instance.send(routing_key, message)
rmq_connection = PikaChannelSingleton()
然后我在 django 应用程序中需要的地方导入 rmq_connection。在玩具应用程序和 python repl 中一切正常,但每次在 django 应用程序中调用 send 函数时都会建立一个新连接。然后连接立即关闭,并显示消息“客户端意外关闭 TCP 连接”。消息确实会正确地发布到交易所。
所以我确信 django 发生了一些事情,以及它如何处理进程等。问题仍然存在,如何在不重新建立连接的情况下将大量消息发布到队列?
【问题讨论】:
标签: django docker rabbitmq amqp pika