【问题标题】:How can I leverage django models in an asyncio co-routine launched within a celery task?如何在 celery 任务中启动的 asyncio 协程中利用 django 模型?
【发布时间】:2019-02-16 18:23:33
【问题描述】:

我重构了一些 django 代码来进行网络抓取。我为正在为其进行数据抓取的每个用户启动一个单独的 Celery 任务。在每个 Celery 任务中,我使用 asyncio 和 aiohttp 为给定的用户进行抓取。

我可以访问我所有的 django 模型类和方法,但是一旦我执行某些操作来触发实际的数据库查询,就会收到如下错误:

...
[2019-02-16 18:04:38,126] WARNING log /home/chrisadmin/anaconda3/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: OperationalError('SSL SYSCALL error: Bad file descriptor\n',):
Traceback (most recent call last):
  File "/home/chrisadmin/anaconda3/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
psycopg2.OperationalError: SSL SYSCALL error: Socket operation on non-socket
...

在 Celery 任务中,只要不涉及 asyncio,我就可以做一些使 Django 与数据库交互而不会出现任何问题的事情。同样,只要不依次从 Celery 任务中启动这些异步任务,我就可以通过 Django 在异步任务中成功地与数据库交互。

如果我设置了CELERY_TASK_ALWAYS_EAGER=True,我不会遇到异常,但在这种情况下当然不会同时运行 Celery 任务。

对于抓取单个用户,asyncio/aiohttp 绰绰有余。但我想使用 Celery 能够跨进程/机器扩展并并行抓取多个用户。以前我曾尝试专门使用 Celery,但我尝试使用 asyncio/aiohttp 进行重构以减少不必要的开销。

我希望能够使用 Celery 为多个用户并行启动抓取,然后在每个 Celery 任务中,我希望能够抓取相应的用户,包括通过 django 模型/方法保存他们抓取的数据。

【问题讨论】:

    标签: postgresql celery python-3.6 python-asyncio django-2.1


    【解决方案1】:

    经过进一步调查后,似乎数据库连接存在问题,并且在结合 asyncio 和 celery 时可能存在线程安全问题,我并不完全理解。

    目前看来可行的解决方案是创建一个新函数:

    from django.db import connections
    from django.conf import settings
    
    
    def reset_db_connections():
        if not settings.CELERY_TASK_ALWAYS_EAGER:
            connections.close_all()
    

    我将此函数称为任何 celery 任务的第一行:

    @shared_task(bind=True)
    def my_celery_task(self, args):
        reset_db_connections()
        # do stuff
        # call stuff that uses asyncio
    

    到目前为止,无论我对CELERY_TASK_ALWAYS_EAGER 有什么设置,这似乎都允许我的代码工作。

    我最初尝试用 connections.close_all() 代替:

     for conn in db.connections.all():
         if conn.connection.closed != 0:
             conn.connection.close()
    

    但这会导致错误,因为我没有关闭需要关闭的连接和/或我正在关闭已经关闭的连接。

    作为上述解决方案的替代方案,我发现更改设置:

    DATABASES = {"default": dj_database_url.config(conn_max_age=600)}

    DATABASES = {"default": dj_database_url.config(conn_max_age=0)}

    也解决了这个问题。但是,据我了解,设置 conn_max_age=0 将为每个数据库操作使用一个新连接,这似乎不是一个好主意。使用上面的reset_db_connections() 方法,我可以离开conn_max_age=600

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-08-13
      • 2019-12-15
      • 2016-05-23
      • 2013-12-16
      • 2020-02-27
      • 1970-01-01
      • 2019-09-25
      相关资源
      最近更新 更多