【发布时间】:2017-07-03 11:26:48
【问题描述】:
我实现了一个 RabbitMQ(版本 3.2.4)异步使用者(如 here 所述)并侦听队列/路由键,并且在我最近进行一些更改之前一直运行没有任何问题。
某些任务很耗时,因此我决定使用多处理库来分离子进程,这些子进程使用多处理队列/池设计来执行这些密集型任务,这样我的主要任务就可以在没有任何等待的情况下执行。
my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_method, (my_queue,))
一旦队列和池被初始化,我在初始化消费者时将队列作为参数传递(ExampleConsumer 的__init__ 方法,如上面的示例链接中所示)。然后,在on_message 方法中,我将消息推送到my_queue 以执行耗时的任务。
编辑:
一些代码示例:
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
try:
example.run()
my_pool.close()
my_pool.join()
except KeyboardInterrupt:
my_pool.terminate()
example.stop()
消费者的init方法和on_message方法:
def __init__(self, amqp_url, queue):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
self.queue = queue
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
self.queue.put(str(body))
进行这些更改后,我开始看到以下类型的异常:
File "consumer_new.py", line 500, in run
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
self.process_timeouts()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
timer['callback']()
File "consumer_new.py", line 290, in reconnect
self._connection.ioloop.start()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
self.poll()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
self._process_fd_events(fd_event_map, write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
handler(fileno, events, write_only=write_only)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
self._handle_read()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
self._process_frame(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
self._deliver_frame_to_channel(frame_value)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
return self._channels[value.channel_number]._handle_content_frame(value)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
self._on_deliver(*response)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
header_frame.properties, body)
File "consumer_new.py", line 452, in on_message
self.acknowledge_message(basic_deliver.delivery_tag)
File "consumer_new.py", line 463, in acknowledge_message
self._channel.basic_ack(delivery_tag)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
self.connection._send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
self._flush_outbound()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
self._handle_write()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
return self._handle_error(error)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
self._handle_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
self._adapter_disconnect()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
self.ioloop.remove_handler(self.socket.fileno())
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
super(PollPoller, self).remove_handler(fileno)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
self.update_handler(fileno, 0)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor
run() 方法在没有任何干预的情况下继续在主进程中运行。如果是这种情况,我不明白为什么会出现 Bad File Descriptor 错误,因为没有其他人可以关闭 rmq 连接。此外,由于上述原因,消费者似乎在失败之前运行了 3-4 小时没有任何问题。
如果文件描述符数量不足,我在 Rabbitmq UI 上进行了检查。但这似乎不是问题。我无法找到可能是什么问题的线索。
感谢任何帮助!谢谢。
【问题讨论】:
标签: python rabbitmq python-multiprocessing pika