【问题标题】:Tasks Stuck in Celery Queue任务卡在 Celery 队列中
【发布时间】:2020-05-20 06:44:26
【问题描述】:

我一直在尝试使用位于 Redis 之上的 Redis Queue 来实现一个任务队列。我摆脱了这一点,并根据我在此处描述的问题在 RabbitMQ 之上使用 Celery:Redis Queue blocking

我参考了上述(未回答的)SO 问题,因为我相信这两个问题足够相似,可以潜在地联系起来——无论是我的代码还是设置。

我可以将任务发送到我的 Celery Queue,并且可以通过在我的 Rabbit docker 容器 bash 中调用 rabbitmqctl list_queues 或通过调用

来看到它们坐在那里

>>> add_nums.delay(2,3)

<AsyncResult: 197315b1-e18b-4945-bf0a-cc6b6b829bfb>

>>> result = add_nums.AsyncResult( 197315b1-e18b-4945-bf0a-cc6b6b829bfb)

在哪里

>>> result.status 'PENDING'

无论我检查多少次。

我尝试在装饰器调用中添加ignore_result=True,但这没有效果。

我的工人阶级:

./workerA.py
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger( __name__)

# Celery configuration
CELERY_BROKER_URL = 'amqp://***:***@rabbit:5672/' #where the asterisks indicate user, pwd
CELERY_RESULT_BACKEND = 'rpc://'

# Initialize celery
celery = Celery( 'workerA', 
        broker=CELERY_BROKER_URL,
        backend=CELERY_RESULT_BACKEND)

@celery.task( ignore_result=True)
def add_nums( a, b):
    logger.info( f'{ a+b=}')
    return a+b

我的主要:

./app.py
import logging
from flask.logging import default_handler
from workerA import add_nums
from workerB import sub_nums
from flask import (
        Flask,
        request,
        jsonify,
    )

logger = logging.getLogger( )
logger.addHandler( default_handler)
logger.setLevel( logging.INFO)

app = Flask( __name__)

@app.route( '/')
def index():
    return 'hello world!'

@app.route( '/add')
def add():
    logger.info( 'in add method')
    first_num, second_num = ( 1,2)

    logger.info( f' { first_num=}')

    result = add_nums.delay( first_num, second_num)
    logger.info( f' {result=}')
    logger.info( f' {result.state=}')

    return jsonify({ 'result': result.result}), 200

@app.route( '/subtract')
def subtract():
    logger.info( 'in sub method')
    first_num, second_num = ( 1,2)

    result = sub_nums.delay( first_num, second_num)
    logger.info( f' {result=}')

    return jsonify( {'result': result.result}), 200

if __name__ == '__main__':
    app.run( debug=True)

无论 n 设置多高,调用 result.get( timeout=n) 总是会导致超时:简而言之,这些队列永远不会满足。

为了完整起见,我的 docker-compose.yml:

version: "3"
services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    ports:
      - 5000:5000
    command: python ./app.py -h 0.0.0.0
    depends_on:
      - rabbit
    volumes:
      - .:/app
  rabbit:
    hostname: rabbit
    image: rabbitmq:management
    environment:
      - RABBITMQ_DEFAULT_USER=***
      - RABBITMQ_DEFAULT_PASS=***
    ports:
      - "5673:5672"
      - "15672:15672"
  worker_1:
    build:
      context: .
    hostname: worker_1
    entrypoint: celery
    command: -A workerA worker --loglevel=info -Q workerA
    volumes:
      - .:/app
    links:
      - rabbit
    depends_on:
      - rabbit
  worker_2:
    build:
      context: .
    hostname: worker_2
    entrypoint: celery
    command: -A workerB worker --loglevel=info -Q workerB
    volumes:
      - .:/app
    links:
      - rabbit
    depends_on:
      - rabbit

还有我的 Dockerfile:

FROM python:3

ADD requirements.txt /app/requirements.txt

WORKDIR /app/

RUN pip install -r requirements.txt

EXPOSE 5000

我正在使用 Docker Desktop for Mac 2.2.0.0,我的 OSX 是 10.15.2 (Catalina)

对此问题的任何帮助将不胜感激。这些队列问题现在已成为我的严重障碍

【问题讨论】:

  • 您的工作人员订阅了workerA,但您正在使用add_nums.delay(2,3) 将任务发送到默认(celery)队列...试试add_nums.appy_async((2,3), queue="workerA")
  • 我认为它有效,但现在我不确定它是否有效。我是否需要 Pika 客户端来提取结果?我的印象是我可以通过执行 result.get() 形式的操作从 Celery 获得结果,其中结果定义为 task.delay() 或 task.apply_assync()
  • 如果我是对的,result.get() 将永远阻塞...但是如果您将任务发送到活动队列,那么它应该会很快返回并获得预期的结果。
  • 我在 workerA.py 中更改了 Celery 对象 (celery) 的定义,现在将其定义为 celery = Celery( 'celery', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND) 这应该意味着在调用该方法时,add_nums.apply_async( (2,3), queue='celery') 应该具有使用上述前提,效果与add_nums.delay(2,3) 相同。在这两种情况下,对此调用.get() 无效,并且状态将永远保持PENDING
  • 是的,这正是我所期望的,因为您的工作人员订阅了workerA 队列(... -Q workerA ...),并且您明确发送到celery 队列......我是退出这次谈话......

标签: python python-3.x docker rabbitmq celery


【解决方案1】:

看起来这个问题的原因是因为没有配置后端来存储结果。用 Celery(..., backend='rpc://') 实例化 Celery 对象似乎除了使“NotImplementedError: No result backend is configured”错误静音之外什么也没做,否则你会得到。我认为这种意义上的文档具有误导性。

开始试用 Redis 结果后端以提高性能。我在其他地方也有 Elasticsearch 和 MongoDB 用于我可以定位的应用程序,但更喜欢 Redis。完成后会在午餐后反馈结果。

【讨论】:

  • 是的,为我的 redis 实例切换 backend='rpc://' - 在这种情况下恰好是 redis://redis:6379 工作正常。所以要么建议的“rpc://”在一般情况下不起作用,或者它在 Docker 中不起作用
最近更新 更多