【问题标题】:Celery: unable to call task through flask, works outside of flask芹菜:无法通过烧瓶调用任务,在烧瓶外工作
【发布时间】:2020-01-16 11:21:16
【问题描述】:

@更新: 将任务从 shared_task 更改为 app.celeryd.celery.task 解决了这个问题。是否有一些额外的设置可以让 shared_tasks 正常工作?

完全重写还是同样的错误。我会尽量保持相对较短。新的项目结构更简洁一些,如下所示:

proj
|-- app
|   |-- controller
|   |   |-- __init__.py
|   |   +-- greeting_model.py
|   |-- model
|   |   |-- __init__.py
|   |   +-- dto
|   |       |-- __init__.py
|   |       +-- greeting_dto.py
|   |-- service
|   |   |-- __init__.py
|   |   +-- greeting_service.py
|   |-- tasks
|   |   |-- __init__.py
|   |   +-- greeting_tasks.py
|   |-- __init__.py
|   |-- celeryd.py
|   +-- flaskd.py
|-- test.py
|-- worker.py
+-- ws.py

我分别初始化 celery 和 flask 并提供 worker.py,它应该在客户端机器上运行,而 ws.py(flask Web 服务)将在另一个机器上运行。 Celery 初始化非常简单,并且使用 rpc 后端和 RabbitMQ 代理。 2 个队列目前是静态的,但稍后将从配置中填充。

from kombu import Queue
from celery import Celery


celery = Celery('LdapProvider',
                broker='amqp://admin:passwd@localhost:5672/dev1',
                backend='rpc',
                include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
    Queue("q1", routing_key="c1.q1"),
    Queue("q2", routing_key="c2.q2"),
)

Worker.py(用于启动 celery worker - 这个问题过于简化了):

from app.celeryd import celery as celery
from celery.bin.worker import worker


if __name__ == '__main__':
    celeryd = worker(app=celery)

    options = {
        'broker': 'amqp://admin:passwd@localhost:5672/dev1',
        'queues': 'q1',
        'loglevel': 'info',
        'traceback': True
    }

    celeryd.run(**options)

我将省略烧瓶初始化并跳转到调用 celery 任务的 greeting_service.py:

# greeting_service.py:
from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello


class GreetingService(object):
    def say_hello(self, name: str) -> str:
        async_result = say_hello.apply_async((name,), queue='q1')
        return async_result.get()



# greeting_tasks.py
from celery import shared_task


@shared_task(bind=True)
def say_hello(self, name: str) -> str:
    return name.capitalize()

无论我尝试什么,这个调用都会通过烧瓶失败。我创建 test.py 只是为了测试 celery 是否有效:

from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello


if __name__ == '__main__':
    async_result = say_hello.apply_async(('jackie',), queue='q1')
    print(async_result.get())

与greeting_service.py 几乎相同,只是没有从作为flask_restplus 命名空间的greeting_controller 调用。 test.py 导致的区别:

/home/pupsz/PycharmProjects/provider/venv37/bin/python /home/pupsz/PycharmProjects/provider/test.py
Jackie

Process finished with exit code 0

[2020-01-16 18:56:17,065: INFO/MainProcess] Received task: app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976]  
[2020-01-16 18:56:17,076: INFO/ForkPoolWorker-2] Task app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976] succeeded in 0.010257695998006966s: 'Jackie'

虽然通过烧瓶,我得到的只是已经显示的内容,并且工作日志没有显示任何传入任务,这意味着通过烧瓶 apply_async 没有将任务发送到 RabbitMQ:

File "/home/xyz/PycharmProjects/proj/app/service/greeting_service.py", line 8, in say_hello
return async_result.get()
NotImplementedError: No result backend is configured.
Please see the documentation for more information.

我发现了一个与 django 类似的问题,但没有答案,所以我有点卡住了,希望得到一些指导。

【问题讨论】:

    标签: celery python-3.7


    【解决方案1】:

    解决方案:这里回答了使 shared_task 按预期工作的解决方案:LINK 修改celerz初始化为:

    from kombu import Queue
    from celery import Celery
    
    
    celery = Celery('LdapProvider',
                    broker='amqp://admin:passwd@localhost:5672/dev1',
                    backend='rpc')
                    # include=['app.tasks.greeting_tasks'])
    celery.conf.task_queues = (
        Queue("q1", routing_key="c1.q1"),
        Queue("q2", routing_key="c2.q2"),
    )
    celery.set_default()
    

    即使我要删除注释掉的包含行,工作人员也会成功获取 app.tasks.greeting_tasks 中定义的 shared_task:

    [tasks]
      . app.tasks.greeting_tasks.say_hello
    

    应用程序设置为 default_app() 后,即使使用 shared_task,也不会再抛出 NotImplementedError。至于原因......我不知道,这是不同配置和谷歌搜索的 6 小时反复试验。在某些更复杂的情况下,我发现官方文档乏善可陈。

    【讨论】:

      猜你喜欢
      • 2015-12-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-15
      • 1970-01-01
      相关资源
      最近更新 更多