【问题标题】:Celery-RabbitMQ Distributed Queue Test MessageCelery-RabbitMQ 分布式队列测试消息
【发布时间】:2014-04-29 21:03:47
【问题描述】:

当我在终端上运行celery -A tasks worker 时,我不断收到:ERROR/MainProcess] consumer: Cannot connect to amqp://ec2celeryuser

基本上我要做的是让 celery/rabbitmq 在 (2) 个 ec2 实例中正常工作。将tasks.py 中的一个愚蠢任务传递给rabbitmq 进行处理。

  1. 实例 1 - 房子 rabbitMQ

目前 RabbitMQ 运行良好。如果我运行sudo rabbitmqctl status,它会输出:

Status of node 'rabbit@ip-xx-xxx-xxx-xx' ...
[{pid,786},

2。实例 2 - Houses Celery

我正在尝试在终端中使用以下内容在 instance 2 上针对实例 1 运行 celery:

celery - 一个任务工作者

我有一个文件celeryconfig.py

BROKER_URL = 'amqp://ec2celeryuser:mypasshere@xx.xxx.xx.xx:5672/celeryserver1/'

#CELERY SETTINGS
CELERY_IMPORTS = ("tasks",)

CELERY_RESULT_BACKEND = "amqp"

我有一个文件client.py

from tasks import add

result = add.delay(4, 4) # call task
result_sum = result.get(timeout=5) # wait to get result for a maximum of 5 seconds

我有一个文件tasks.py

from celery import Celery

app = Celery('tasks', broker='amqp://ec2celeryuser:mypasshere@xx.xxx.xx.xx:5672/celeryserver1/')

@app.task
def add(x, y):
    return x + y

我已经正确设置了一个vhost,一个用户ec2celeryuser,并赋予这个用户以下权限:

sudo rabbitmqctl set_permissions -p /celeryserver1 ec2celeryuser ".*" ".*" ".*"

如果我这样做:sudo rabbitmqctl list_users 在 RabbitMQ(实例 1)上显示:

ec2celeryuser   []
guest   [administrator

我已经尝试了两个用户名及其密码,但没有任何变化。

我一直在关注Celery Guidetutorial,但运气不佳。

我在这里做错了什么?显然存在连接问题,但我做错了什么?

谢谢!

【问题讨论】:

  • 一个疯狂的猜测,但你的密码中有一些奇怪的字符吗?
  • 不,它的字母数字,没有特殊字符。

标签: python rabbitmq celery message-queue


【解决方案1】:

感谢用户 natdempk 帮助我修复队列的配置语法。

问题是在rabbitmq 中创建vhost,例如:

sudo rabbitmqctl add_vhost /celeryserver1

应该是什么时候:

sudo rabbitmqctl add_vhost celeryserver1

然后我不得不为我的用户 ec2celeryuser 重置权限,例如:

sudo rabbitmqctl set_permissions -p celeryserver1 ec2celeryuser ".*" ".*" ".*"

我意识到这是问题的方式是:我访问了/var/log/rabbitmq/<last log file.log>

看到了:

=INFO REPORT==== 30-Apr-2014::12:45:58 ===
accepted TCP connection on [::]:5672 from xx.xxx.xxx.xxx:45964

=INFO REPORT==== 30-Apr-2014::12:45:58 ===
starting TCP connection <x.xxx.x> from from xx.xxx.xxx.xxx:45964

=ERROR REPORT==== 30-Apr-2014::12:46:01 ===
exception on TCP connection <x.xxx.x> from from xx.xxx.xxx.xxx:45964
{channel0_error,opening,
                {amqp_error,access_refused,
                            "access to vhost 'celeryserver1/' refused for user 'ec2celeryuser'",
                            'connection.open'}}

自从修复了虚拟主机,我现在很高兴看到:

[2014-04-30 13:08:10,101: WARNING/MainProcess] celery@ip-xx-xxx-xx-xxx ready.

【讨论】:

    【解决方案2】:

    所以我在这里看到了一些问题。首先,tasks.py 中 rabbitMQ 的代理 URL 似乎不正确。它应该如下所示。

    app = Celery('tasks', broker='amqp://ec2celeryuser:ec2celerypassword@xx.xxx.xx.xx/celeryserver1/')

    此外,您可能希望在运行工作进程时指定您希望 celery 服务的应用程序。您可以通过从tasks.py 所在的目录运行celery -A tasks worker 来完成此操作。

    另一件事是您在client.py 中调用您的任务的代码似乎不正确。从 celery 文档中,您可以按如下方式调用任务:

    from tasks import add
    
    result = add.delay(4, 4) # call task
    result_sum = result.get(timeout=5) # wait to get result for a maximum of 5 seconds
    

    解决这些问题可能会解决您的问题,或者至少让您更接近。

    【讨论】:

    • 感谢您的彻底回复。我已经实现了这一点,但仍然得到 ERROR/MainProcess] 消费者:无法连接到 amqp://ec2celeryuser@xx.xxx.xx.xx:5672/celeryserver1/ 任何其他想法?
    • @CodeTalk 我认为您的代理 URL 仍然是错误的。它们的格式应为:amqp://user:password@IP:port/vhost/
    • @CodeTalk 你还缺少:broker 中的端口吗?
    • 它在那里。所有代码都是最新的。有任何想法吗?仍然抛出同样的问题
    • 我这样做了:sudo rabbitmqctl list_users,它显示:ec2celeryuser [] guest [administrator] 这可能有问题吗?