【发布时间】:2020-05-20 06:44:26
【问题描述】:
我一直在尝试使用位于 Redis 之上的 Redis Queue 来实现一个任务队列。我摆脱了这一点,并根据我在此处描述的问题在 RabbitMQ 之上使用 Celery:Redis Queue blocking
我参考了上述(未回答的)SO 问题,因为我相信这两个问题足够相似,可以潜在地联系起来——无论是我的代码还是设置。
我可以将任务发送到我的 Celery Queue,并且可以通过在我的 Rabbit docker 容器 bash 中调用 rabbitmqctl list_queues 或通过调用
>>> add_nums.delay(2,3)
<AsyncResult: 197315b1-e18b-4945-bf0a-cc6b6b829bfb>
>>> result = add_nums.AsyncResult( 197315b1-e18b-4945-bf0a-cc6b6b829bfb)
在哪里
>>> result.status
'PENDING'
无论我检查多少次。
我尝试在装饰器调用中添加ignore_result=True,但这没有效果。
我的工人阶级:
./workerA.py
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger( __name__)
# Celery configuration
CELERY_BROKER_URL = 'amqp://***:***@rabbit:5672/' #where the asterisks indicate user, pwd
CELERY_RESULT_BACKEND = 'rpc://'
# Initialize celery
celery = Celery( 'workerA',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
@celery.task( ignore_result=True)
def add_nums( a, b):
logger.info( f'{ a+b=}')
return a+b
我的主要:
./app.py
import logging
from flask.logging import default_handler
from workerA import add_nums
from workerB import sub_nums
from flask import (
Flask,
request,
jsonify,
)
logger = logging.getLogger( )
logger.addHandler( default_handler)
logger.setLevel( logging.INFO)
app = Flask( __name__)
@app.route( '/')
def index():
return 'hello world!'
@app.route( '/add')
def add():
logger.info( 'in add method')
first_num, second_num = ( 1,2)
logger.info( f' { first_num=}')
result = add_nums.delay( first_num, second_num)
logger.info( f' {result=}')
logger.info( f' {result.state=}')
return jsonify({ 'result': result.result}), 200
@app.route( '/subtract')
def subtract():
logger.info( 'in sub method')
first_num, second_num = ( 1,2)
result = sub_nums.delay( first_num, second_num)
logger.info( f' {result=}')
return jsonify( {'result': result.result}), 200
if __name__ == '__main__':
app.run( debug=True)
无论 n 设置多高,调用 result.get( timeout=n) 总是会导致超时:简而言之,这些队列永远不会满足。
为了完整起见,我的 docker-compose.yml:
version: "3"
services:
web:
build:
context: .
dockerfile: Dockerfile
restart: always
ports:
- 5000:5000
command: python ./app.py -h 0.0.0.0
depends_on:
- rabbit
volumes:
- .:/app
rabbit:
hostname: rabbit
image: rabbitmq:management
environment:
- RABBITMQ_DEFAULT_USER=***
- RABBITMQ_DEFAULT_PASS=***
ports:
- "5673:5672"
- "15672:15672"
worker_1:
build:
context: .
hostname: worker_1
entrypoint: celery
command: -A workerA worker --loglevel=info -Q workerA
volumes:
- .:/app
links:
- rabbit
depends_on:
- rabbit
worker_2:
build:
context: .
hostname: worker_2
entrypoint: celery
command: -A workerB worker --loglevel=info -Q workerB
volumes:
- .:/app
links:
- rabbit
depends_on:
- rabbit
还有我的 Dockerfile:
FROM python:3
ADD requirements.txt /app/requirements.txt
WORKDIR /app/
RUN pip install -r requirements.txt
EXPOSE 5000
我正在使用 Docker Desktop for Mac 2.2.0.0,我的 OSX 是 10.15.2 (Catalina)
对此问题的任何帮助将不胜感激。这些队列问题现在已成为我的严重障碍
【问题讨论】:
-
您的工作人员订阅了
workerA,但您正在使用add_nums.delay(2,3)将任务发送到默认(celery)队列...试试add_nums.appy_async((2,3), queue="workerA") -
我认为它有效,但现在我不确定它是否有效。我是否需要 Pika 客户端来提取结果?我的印象是我可以通过执行 result.get() 形式的操作从 Celery 获得结果,其中结果定义为 task.delay() 或 task.apply_assync()
-
如果我是对的,
result.get()将永远阻塞...但是如果您将任务发送到活动队列,那么它应该会很快返回并获得预期的结果。 -
我在 workerA.py 中更改了 Celery 对象 (celery) 的定义,现在将其定义为
celery = Celery( 'celery', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)这应该意味着在调用该方法时,add_nums.apply_async( (2,3), queue='celery')应该具有使用上述前提,效果与add_nums.delay(2,3)相同。在这两种情况下,对此调用.get()无效,并且状态将永远保持PENDING -
是的,这正是我所期望的,因为您的工作人员订阅了
workerA队列(... -Q workerA ...),并且您明确发送到celery队列......我是退出这次谈话......
标签: python python-3.x docker rabbitmq celery