【发布时间】:2019-08-08 22:21:40
【问题描述】:
我正在寻找使用 Python 执行 ETL 的最佳方法。
我在 RabbitMQ 中有一个发送事件的通道(甚至可以每秒发送一次)。 我想处理每 1000 个。 主要问题是 RabbitMQ 接口(我正在使用 pika)在每条消息上都会引发回调。 我查看了 Celery 框架,但批处理功能在版本 3 中已被贬低。
最好的方法是什么?我正在考虑将我的事件保存在一个列表中,当它达到 1000 时将其复制到其他列表并执行我的处理。但是,如何使它成为线程安全的?不想丢事件,又怕在同步列表的时候丢事件。
这听起来像是一个非常简单的用例,但是我没有找到任何好的最佳实践。
【问题讨论】:
-
我觉得这样做是个糟糕的设计!似乎效率不高。你为什么要这样设计?您不能使用多线程并让订阅者在独立线程中订阅事件并处理每个事件吗?
-
这里是用例:我正在计算事件的聚合并将数据保存到数据库。所以在python运行时聚合1000个事件然后只更新数据库要快得多,否则更新数据库1000次(这将是多次更新)。
-
你可以监听所有的事件,当你达到 1000 时你把它交给一个处理程序将它们存储在数据库中,你可以使用多线程
标签: python-3.x rabbitmq etl