【问题标题】:Best Practice for Batch Processing with RabbitMQ使用 RabbitMQ 进行批处理的最佳实践
【发布时间】:2019-08-08 22:21:40
【问题描述】:

我正在寻找使用 Python 执行 ETL 的最佳方法。

我在 RabbitMQ 中有一个发送事件的通道(甚至可以每秒发送一次)。 我想处理每 1000 个。 主要问题是 RabbitMQ 接口(我正在使用 pika)在每条消息上都会引发回调。 我查看了 Celery 框架,但批处理功能在版本 3 中已被贬低。

最好的方法是什么?我正在考虑将我的事件保存在一个列表中,当它达到 1000 时将其复制到其他列表并执行我的处理。但是,如何使它成为线程安全的?不想丢事件,又怕在同步列表的时候丢事件。

这听起来像是一个非常简单的用例,但是我没有找到任何好的最佳实践。

【问题讨论】:

  • 我觉得这样做是个糟糕的设计!似乎效率不高。你为什么要这样设计?您不能使用多线程并让订阅者在独立线程中订阅事件并处理每个事件吗?
  • 这里是用例:我正在计算事件的聚合并将数据保存到数据库。所以在python运行时聚合1000个事件然后只更新数据库要快得多,否则更新数据库1000次(这将是多次更新)。
  • 你可以监听所有的事件,当你达到 1000 时你把它交给一个处理程序将它们存储在数据库中,你可以使用多线程

标签: python-3.x rabbitmq etl


【解决方案1】:

如何使它成为线程安全的?

如何设置消费者prefetch-count=1000。如果消费者的unack 消息达到其预取限制,rabbitmq 将不会向其传递任何消息。

不要ACK收到消息,直到您有1000条消息,然后将其复制到其他列表并执行您的处理。完成工作后,ACK 最后一条消息,all message before this message will be ACK by rabbitmq server

但我不确定大预取是否是最佳做法。

【讨论】:

    【解决方案2】:

    首先,你不应该“批处理”来自 RabbitMQ 的消息,除非你真的必须这样做。处理消息传递的最有效方法是独立处理每条消息。

    如果您需要批量合并消息,我会使用单独的数据存储来临时存储消息,然后在达到特定条件时对其进行处理。每次将项目添加到批处理时,您都会检查该条件(例如,您达到 1000 条消息)并触发批处理的处理。

    这比在内存中保存一个列表要好,因为如果你的服务死了,消息仍然会保存在数据库中。

    注意:如果每个队列有一个处理器,这可以在没有任何同步机制的情况下工作。如果您有多个处理器,则需要实现某种锁定机制。

    【讨论】:

    • 批处理可以在许多用例中发挥作用。例如,在数据科学中,批量进行预测可以获得巨大的性能提升。
    • 为什么使用单独的数据存储来临时存储消息比使用rabbit作为“数据存储”通过批量处理更好?
    猜你喜欢
    • 2017-04-27
    • 1970-01-01
    • 1970-01-01
    • 2011-10-23
    • 2015-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-25
    相关资源
    最近更新 更多