【问题标题】:RMQ asynchronous consumer : pika.adapters.base_connection, _handle_error, Fatal Socket Error: error(9, 'Bad file descriptor')RMQ 异步消费者:pika.adapters.base_connection、_handle_error、致命套接字错误:错误(9,'Bad file descriptor')
【发布时间】:2017-07-03 11:26:48
【问题描述】:

我实现了一个 RabbitMQ(版本 3.2.4)异步使用者(如 here 所述)并侦听队列/路由键,并且在我最近进行一些更改之前一直运行没有任何问题。

某些任务很耗时,因此我决定使用多处理库来分离子进程,这些子进程使用多处理队列/池设计来执行这些密集型任务,这样我的主要任务就可以在没有任何等待的情况下执行。

my_queue = multiprocessing.Queue()
my_pool = multiprocessing.Pool(2, my_method, (my_queue,))

一旦队列和池被初始化,我在初始化消费者时将队列作为参数传递(ExampleConsumer__init__ 方法,如上面的示例链接中所示)。然后,在on_message 方法中,我将消息推送到my_queue 以执行耗时的任务。

编辑:

一些代码示例:

def main():
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    my_queue = multiprocessing.Queue()
    my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
    example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
    try:
        example.run()
        my_pool.close()
        my_pool.join()
    except KeyboardInterrupt:
        my_pool.terminate()
        example.stop()

消费者的init方法和on_message方法:

def __init__(self, amqp_url, queue):
        """Create a new instance of the consumer class, passing in the AMQP
        URL used to connect to RabbitMQ.

        :param str amqp_url: The AMQP url to connect with

        """
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._url = amqp_url
        self.queue = queue

def on_message(self, unused_channel, basic_deliver, properties, body):
        """Invoked by pika when a message is delivered from RabbitMQ. The
        channel is passed for your convenience. The basic_deliver object that
        is passed in carries the exchange, routing key, delivery tag and
        a redelivered flag for the message. The properties passed in is an
        instance of BasicProperties with the message properties and the body
        is the message that was sent.

        :param pika.channel.Channel unused_channel: The channel object
        :param pika.Spec.Basic.Deliver: basic_deliver method
        :param pika.Spec.BasicProperties: properties
        :param str|unicode body: The message body

        """
        LOGGER.info('Received message # %s from %s: %s',
                    basic_deliver.delivery_tag, properties.app_id, body)
        self.acknowledge_message(basic_deliver.delivery_tag)
        self.queue.put(str(body))

进行这些更改后,我开始看到以下类型的异常:

File "consumer_new.py", line 500, in run
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
    self.process_timeouts()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
    timer['callback']()
  File "consumer_new.py", line 290, in reconnect
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
    self.poll()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
    self._process_fd_events(fd_event_map, write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
    handler(fileno, events, write_only=write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
    self._handle_read()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
    self._on_data_available(data)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
    self._process_frame(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
    self._deliver_frame_to_channel(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
    return self._channels[value.channel_number]._handle_content_frame(value)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
    self._on_deliver(*response)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
    header_frame.properties, body)
  File "consumer_new.py", line 452, in on_message
    self.acknowledge_message(basic_deliver.delivery_tag)
  File "consumer_new.py", line 463, in acknowledge_message
    self._channel.basic_ack(delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
    return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
    self.connection._send_method(self.channel_number, method_frame, content)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
    self._send_frame(frame.Method(channel_number, method_frame))
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
    self._flush_outbound()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
    self._handle_write()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
    return self._handle_error(error)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
    self._handle_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
    self._adapter_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
    self.ioloop.remove_handler(self.socket.fileno())
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
    super(PollPoller, self).remove_handler(fileno)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
    self.update_handler(fileno, 0)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
    self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor

run() 方法在没有任何干预的情况下继续在主进程中运行。如果是这种情况,我不明白为什么会出现 Bad File Descriptor 错误,因为没有其他人可以关闭 rmq 连接。此外,由于上述原因,消费者似乎在失败之前运行了 3-4 小时没有任何问题。

如果文件描述符数量不足,我在 Rabbitmq UI 上进行了检查。但这似乎不是问题。我无法找到可能是什么问题的线索。

感谢任何帮助!谢谢。

【问题讨论】:

    标签: python rabbitmq python-multiprocessing pika


    【解决方案1】:

    Pika 不是线程安全的。它在文档中说得很清楚。如果您对线程或子进程中的连接或通道进行任何操作,各种事情最终都会出错,并且您的程序将崩溃并出现奇怪和无信息的错误。它可能会工作一段时间,但最终 Pika 结构会被破坏。

    如果您需要多处理和rabbitmq,您有几个选择。

    1. 使用 rabbitpy 而不是 Pika。我没有使用它,所以我无法评论它对你的适用性,但它是线程安全的。
    2. 如果可以,请将任务分开,以便您的子进程可以打开自己的 Pika 连接。如果您的主程序收到请求,有一个子进程来处理它然后发送结果,这将不起作用。例如,如果您需要发送确认,则不能让您的子流程确认在主流程中收到的消息。
    3. 从子进程中删除 Pika。如果您的子流程的想法是向它们分派计算或耗时的任务,您可以尝试创建两个队列:一个用于子流程输入,一个用于输出,并让您的子流程将结果返回到队列中的主程序。那么主程序就可以根据这个来处理rabbitmq的流量了。
    4. 如果您的程序是处理请求的某种服务器,请将所有内容拆分为子进程(“工作队列”-模型)https://www.rabbitmq.com/tutorials/tutorial-two-python.html,并让每个子进程作为消费者独立订阅队列。 Rabbitmq 负责循环调度,通过限制预取,您可以使其子进程只选择一个任务,并且在该任务的处理完成之前,它不会选择其他任何东西,确保在第一个任务之后立即发送一个将被空闲线程或子进程拾取。在这个模型中,你的主进程根本不需要 Pika 连接,并且每个子进程都有一个独立的连接,如 2)。

    希望这会有所帮助。

    汉努

    【讨论】:

    • 我还有一个问题。根据 pika 文档,如果我们不跨线程/进程共享 pika 连接,那么应该没问题。我只是将多处理队列传递给具有主要消费者逻辑的方法,并从那里将元素推送到该队列。这种类型的操作是否会影响线程安全,因为我没有跨线程传递 pika 连接?感谢您的宝贵时间!
    • 我不太明白你在线程中做了什么。您推送到队列中的这些“元素”是什么?如果它们是您的 on_response 回调中已经提取的数据,那很好。如果是rabbitmq或者message对象,那就不行了。因此,如果您在 on_response(ch, method, props, body) 函数中执行类似 queue.put(json.loads(body)) 的操作,假设您收到 json 数据,那应该没问题。
    • 您是否在您的线程中发送消息确认?或者提供对请求的响应?还是他们只是盲目地处理请求而不以任何方式与rabbitmq交互?如果是,那么您可能遇到了错误。如果你的线程通过 Pika 以某种方式访问​​ rabbitmq,我 99% 确定这里存在线程安全问题。
    • queue.put(json.loads(body)) 正是我在 on_message(unused_channel, basic_deliver, properties, body) 方法中所做的。 Queue 在 main 方法中声明并作为参数传递给 init 方法,稍后在 on_message 方法中使用。
    • 那么基于此,我认为我无法提供更多帮助。如果我的理解是正确的,那么您的代码应该没问题。如果您能够共享代码的任何部分,您可能希望在 Pika git 页面中打开支持票证。它背后的人反应很快,但他们需要先看一些代码才能做很多事情。
    猜你喜欢
    • 2013-06-16
    • 2021-12-05
    • 1970-01-01
    • 2018-12-21
    • 2020-12-04
    • 1970-01-01
    • 2013-12-31
    • 1970-01-01
    • 2019-09-12
    相关资源
    最近更新 更多