【发布时间】:2017-01-29 12:32:15
【问题描述】:
我有一个反应器,它从 RabbitMQ 代理获取消息并触发工作方法以在进程池中处理这些消息,如下所示:
这是使用 python asyncio、loop.run_in_executor() 和 concurrent.futures.ProcessPoolExecutor 实现的。
现在我想使用 SQLAlchemy 在工作方法中访问数据库。大多数情况下,处理将是非常简单和快速的 CRUD 操作。
reactor 开始时每秒会处理 10-50 条消息,因此不能为每个请求都打开一个新的数据库连接。相反,我想为每个进程维护一个持久连接。
我的问题是:我该怎么做?我可以将它们存储在全局变量中吗? SQA 连接池会为我处理这个问题吗?反应堆停止时如何清理?
[更新]
- 数据库是带有 InnoDB 的 MySQL。
为什么选择这种带有进程池的模式?
当前实现使用不同的模式,每个消费者在自己的线程中运行。不知何故,这不是很好。已经有大约 200 个消费者在各自的线程中运行,并且系统正在快速增长。为了更好地扩展,想法是分离关注点并在 I/O 循环中使用消息并将处理委托给池。当然,整个系统的性能主要受 I/O 限制。但是,在处理大型结果集时,CPU 是一个问题。
另一个原因是“易于使用”。虽然消息的连接处理和消费是异步实现的,但worker中的代码可以是同步的和简单的。
很快就发现,通过工作人员内部的持久网络连接访问远程系统是一个问题。这就是 CommunicationChannels 的用途:在 worker 内部,我可以通过这些通道向消息总线授予请求。
我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库。但是,我不知道如何使用 SQLAlchemy 做到这一点。
切入点在哪里?
对象在通过队列时需要为pickled。如何从 SQA 查询中获取这样的对象?
与数据库的通信必须异步进行,以免阻塞事件循环。我可以使用例如aiomysql 作为 SQA 的数据库驱动程序?
【问题讨论】:
-
那么每个worker都是自己的进程?那时不能共享连接,所以也许你应该实例化每个(本地)SQA 池,最大 1 或 2 个连接限制。然后观察,也许通过数据库(哪个数据库?)正在产生/杀死哪些连接。在这件事上已经被严重烧毁了 - 你不想想要做的是在 SQA 之上实现你自己的幼稚 conn 池。或者尝试确定 SQA 连接是否已关闭。
-
@JLPeyret:我用您要求的信息更新了问题。不...我不打算实现自己的连接池。
-
所以,我想我记得连接不能跨进程(在操作系统的意义上,与线程区分开来)。而且我知道连接根本不能很好地腌制。您应该能够向“死”(字符串)sql 语句发送消息,但我相信您将很难传递 db conns,我认为可能包括 SQA 结果。我的猜测,但在一定程度上使用奇怪的 SQA 来证明它的合理性。
标签: python sqlalchemy rabbitmq python-multiprocessing python-asyncio