【问题标题】:How to combine Celery with asyncio?如何将 Celery 与 asyncio 结合使用?
【发布时间】:2017-02-10 11:07:20
【问题描述】:

如何创建一个使 celery 任务看起来像 asyncio.Task 的包装器?或者有没有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,said this:

将 Celery 用作异步 I/O 框架之上的分布式层是很常见的(重要提示:将 CPU 绑定任务路由到 prefork worker 意味着它们不会阻塞您的事件循环)。

但我找不到任何专门针对 asyncio 框架的代码示例。

【问题讨论】:

  • 你能澄清一下“看起来”是什么意思吗?我想您可能误解了 Asksol 的评论——您将 celery 放在了作为异步管理器的 Rabbit 或 SQS 等框架的前面。因此,您可以为使用 asyncio 的 celery 创建一个代理/插件,但任务不会“看起来像”(即具有 asyncio 的接口)? celery 的重点是抽象使用的异步方法?

标签: python python-3.x asynchronous celery python-asyncio


【解决方案1】:

这是我在必要时处理异步协程的 Celery 实现:

包装 Celery 类以扩展其功能:

from celery import Celery
from inspect import isawaitable
import asyncio


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                result = TaskBase.__call__(self, *args, **kwargs)
                if isawaitable(result):
                    await result

            def __call__(self, *args, **kwargs):
                asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app

        conf = {}
        for key in app.config.keys():
            if key[0:7] == 'CELERY_':
                conf[key[7:].lower()] = app.config[key]

        if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
            conf['broker_transport_options'] = {'region': 'eu-west-1'}

        self.config_from_object(conf)


celery = AsyncCelery()

【讨论】:

    【解决方案2】:

    这是一个简单的助手,您可以使用它来让 Celery 任务等待:

    import asyncio
    from asgiref.sync import sync_to_async
    
    # Converts a Celery tasks to an async function
    def task_to_async(task):
        async def wrapper(*args, **kwargs):
            delay = 0.1
            async_result = await sync_to_async(task.delay)(*args, **kwargs)
            while not async_result.ready():
                await asyncio.sleep(delay)
                delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
            return async_result.get()
        return wrapper
    

    sync_to_async一样,可以直接作为包装器使用:

    @shared_task
    def get_answer():
        sleep(10) # simulate long computation
        return 42    
    
    result = await task_to_async(get_answer)()
    

    ...作为装饰者:

    @task_to_async
    @shared_task
    def get_answer():
        sleep(10) # simulate long computation
        return 42    
    
    result = await get_answer()
    

    当然,这不是一个完美的解决方案,因为它依赖于polling。 但是,在 Celery officially provides a better solution 之前从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法。

    编辑 2021/03/02:添加对 sync_to_async 的调用以支持 eager mode

    【讨论】:

    • 这是一个可靠的解决方法,我们已经在我的回答中提到的 FastAPI 应用程序中使用了它(虽然不是作为装饰器):) 请记住,您需要注意错误处理并有一个计划如何处理任何潜在的异常!
    • task_to_async 调用 AsyncResult.get(),这会重新引发任务引发的任何异常。当然,如果你想自定义这个行为,你可以在task_to_async添加参数,然后转发给async_result.get()
    • 将任务包装在异步助手中的意义何在?你不能只用睡眠来实现循环,没有它吗? Afaik task.delay 是非阻塞的。只有类似 task.get 的东西会阻塞。
    【解决方案3】:

    编辑:2021 年 1 月 12 日以前的答案(在底部找到)没有很好地老化,因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍然在寻找如何共同使用 asyncio 和芹菜

    让我们先快速分解用例(更深入的分析在这里:asyncio and coroutines vs task queues):

    • 如果任务受 I/O 限制,则使用协程和异步会更好。
    • 如果任务受 CPU 限制,则最好使用 Celery 或其他类似的任务管理系统。

    因此,在 Python 的“做一件事并把它做好”的上下文中,不要尝试将 asyncio 和 celery 混合在一起是有意义的。

    但是,如果我们希望能够以异步方式和异步任务的形式运行方法,会发生什么情况?那么我们有一些选择可以考虑:

    • 我能找到的最好的例子如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现它是@Franey's response):

      1. 定义你的异步方法。

      2. 使用asgirefsync.async_to_sync模块封装异步方法并在一个celery任务中同步运行:

        # tasks.py
        import asyncio
        from asgiref.sync import async_to_sync
        from celery import Celery
        
        app = Celery('async_test', broker='a_broker_url_goes_here')
        
        async def return_hello():
            await asyncio.sleep(1)
            return 'hello'
        
        
        @app.task(name="sync_task")
        def sync_task():
            async_to_sync(return_hello)()
        
    • 我在FastAPI 应用程序中遇到的一个用例与上一个示例相反:

      1. CPU 密集型进程占用了异步端点。

      2. 解决方案是将异步 CPU 绑定进程重构为 celery 任务,并从 Celery 队列中传递一个任务实例以执行。

      3. 可视化该案例的最小示例:

        import asyncio
        import uvicorn
        
        from celery import Celery
        from fastapi import FastAPI
        
        app = FastAPI(title='Example')
        worker = Celery('worker', broker='a_broker_url_goes_here')
        
        @worker.task(name='cpu_boun')
        def cpu_bound_task():
            # Does stuff but let's simplify it
            print([n for n in range(1000)])
        
        @app.get('/calculate')
        async def calculate():
            cpu_bound_task.delay()
        
        if __name__ == "__main__":
            uvicorn.run('main:app', host='0.0.0.0', port=8000)
        
    • 另一个解决方案似乎是 @juanra@danius 在他们的答案中提出的,但我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此这些答案需要在之前进行监控我们可以决定在 prod 环境中使用它们。

    最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我会在这里列出它们:

    • Celery Pool AsyncIO 这似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 0.2.0 版 01/12/2021)
    • aiotasks 声称是“一个类似于 Celery 的任务管理器,它分发 Asyncio 协程”,但似乎有点陈旧(大约 2 年前的最新提交)

    嗯,它的年龄没有那么好,是吗? Celery 5.0 版没有实现异步兼容性,因此我们无法知道何时以及是否会实现......出于响应遗留原因(因为它是当时的答案)和评论继续,将其留在这里。

    如官方网站所述,这将从 Celery 5.0 版中实现:

    http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

    1. Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。
    2. 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用打字、异步/等待、异步和类似概念,在旧版本中没有其他选择。

    以上内容来自上一个链接。

    所以最好的办法就是等待5.0版发布!

    与此同时,祝你编码愉快:)

    【讨论】:

    • 这没有发生,而且 celery 5 与 asyncio 不兼容。
    • @piro 我还没用过 celery 5,我会进一步调查!感谢更新
    • @piro 好吧,我做了我的研究并重构了这个答案,希望你能在那里找到有用的东西!
    • 我打开了a feature request,他们回答“这是我们为 celery 6.0 计划的更大设计决策的一部分”。
    • 在我们得到 Celery 的官方支持之前,我发现 polling the status of the AyncResult 提供了一个很好的解决方法。
    【解决方案4】:

    我通过在 celery-pool-asyncio 库中结合 Celery 和 asyncio 解决了问题。

    【讨论】:

    • 这实际上似乎是一个很好的解决方案,唯一的问题是它不支持 celery 5。有任何时间表吗?
    【解决方案5】:

    这种简单的方法对我来说效果很好:

    import asyncio
    from celery import Celery
    
    app = Celery('tasks')
    
    async def async_function(param1, param2):
        # more async stuff...
        pass
    
    @app.task(name='tasks.task_name', queue='queue_name')
    def task_name(param1, param2):
        asyncio.run(async_function(param1, param2))
    

    【讨论】:

      【解决方案6】:

      我发现的最简洁的方法是将async 函数包装在asgiref.sync.async_to_sync 中(来自asgiref):

      from asgiref.sync import async_to_sync
      from celery.task import periodic_task
      
      
      async def return_hello():
          await sleep(1)
          return 'hello'
      
      
      @periodic_task(
          run_every=2,
          name='return_hello',
      )
      def task_return_hello():
          async_to_sync(return_hello)()
      

      我从我写的blog post 中提取了这个例子。

      【讨论】:

      • 非常好,我在研究这个问题的过程中发现了你的文章,并将它包含在我的答案的编辑中(我现在当然要提到你,因为我发现了它)!感谢您的知识提升:)
      • 谢谢!看到对我文章的引用总是很酷,即使它在同一个线程中。
      【解决方案7】:

      您可以使用run_in_executor 将任何阻塞调用包装到任务中,如documentation 中所述,我还在示例中添加了自定义timeout

      def run_async_task(
          target,
          *args,
          timeout = 60,
          **keywords
      ) -> Future:
          loop = asyncio.get_event_loop()
          return asyncio.wait_for(
              loop.run_in_executor(
                  executor,
                  functools.partial(target, *args, **keywords)
              ),
              timeout=timeout,
              loop=loop
          )
      loop = asyncio.get_event_loop()
      async_result = loop.run_until_complete(
          run_async_task, your_task.delay, some_arg, some_karg="" 
      )
      result = loop.run_until_complete(
          run_async_task, async_result.result 
      )
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-04-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多