【问题标题】:Celery First Steps - timeout error on result.get()Celery 第一步 - result.get() 上的超时错误
【发布时间】:2014-09-08 07:03:35
【问题描述】:

我在这里学习 Celery First Steps 教程:http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

我按照本教程使用 RabbitMQ。

当我执行 result.get(timeout=1) 时,它显示超时错误,即使它是一个简单的添加操作,我可以看到工作人员正在运行并在另一个中产生正确的结果(8 个)窗口

(venv) C:\Volt\celerytest>ipython
Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
Type "copyright", "credits" or "license" for more information.

IPython 2.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: from tasks import add

In [2]: a = add(1,3)

In [3]: a
Out[3]: 4

In [4]: a = add.delay(1,3)

In [5]: a.ready()
Out[5]: False

In [6]: a = add.delay(4,4)

In [7]: a.get(timeout=0.5)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
<ipython-input-7-2c407a92720e> in <module>()
----> 1 a.get(timeout=0.5)

C:\Users\Som\Envs\venv\lib\site-packages\celery\result.pyc in get(self, timeout,
 propagate, interval, no_ack, follow_parents)
    167                 interval=interval,
    168                 on_interval=on_interval,
--> 169                 no_ack=no_ack,
    170             )
    171         finally:

C:\Users\Som\Envs\venv\lib\site-packages\celery\backends\amqp.pyc in wait_for(se
lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
GATE_STATES, **kwargs)
    155                                     on_interval=on_interval)
    156             except socket.timeout:
--> 157                 raise TimeoutError('The operation timed out.')
    158
    159         if meta['status'] in PROPAGATE_STATES and propagate:

TimeoutError: The operation timed out.

In [8]:

tasks.py 文件

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')


@app.task
def add(x, y):
    return x + y

工作人员日志

[tasks]
  . tasks.add

[2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
.1:5672//
[2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
[2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
[2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
[2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
b5-442a-a574-c8b976a33739]
[2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
74-c8b976a33739] succeeded in 0.000999927520752s: 4
[2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
72-485d-926e-a4e564ccc89a]
[2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

【问题讨论】:

    标签: python rabbitmq celery django-celery


    【解决方案1】:

    在完成“Celery First Steps”后,我遇到了完全相同的问题。

    我认为这是建议backend='amqp'的原因。

    对我有用的设置如下:

    app = Celery('tasks', broker='amqp://guest@localhost//')
    app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
    

    根据文档,当使用 AMQP 结果后端时,每个结果只能检索一次(实际上是查询中的一条消息)。

    我想,您的工作进程检索它以便将结果打印到控制台:

    Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

    所以你再次检索到相同的结果失败。

    【讨论】:

    • 谢谢。它是一个非常简单的黑客,但它连续两天让我绊倒......谢谢。但是想知道为什么他们不会在文档中指出这一点,而且我想知道它是否会影响指南的其余部分,因为该指南基于 amqp 经纪人
    • 此错误仍在文档中,没有任何建议的修复。我将提出解决此问题的请求。
    • +1 用于展示如何使用不同的结果后端!请注意,sqlalchemy 需要使用 sqlite 后端,可以通过pip install sqlalchemy 轻松安装。
    【解决方案2】:

    如果您查看this thread,似乎设置--pool=solo 也可以解决问题。这对我有用。

    【讨论】:

    • 可能是解决问题的最快方法。但请记住,使用 --pool=solo 标志调用 celery 会使 celery 使用工作池的单线程实现,这会阻止您潜在地利用并行处理。因此,我猜应该首选 Strikki 接受的解决方案。
    【解决方案3】:

    有时我也收到TimeoutError 带有redis,所以我实现了帮助函数:

    celery_app.update(
        redis_socket_timeout=5,
        redis_socket_connect_timeout=5,
    )
    
    
    def run_task(task, *args, **kwargs):
        timeout = 2 * 60
        future = task.apply_async(args, kwargs)
        time_end = time.time() + timeout
    
        while True:
            try:
                return future.get(timeout=timeout)
            except redis.TimeoutError:
                if time.time() < time_end:
                    continue
                raise
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-10-26
      • 1970-01-01
      • 2018-10-30
      • 2020-12-25
      • 2018-01-11
      • 1970-01-01
      • 2020-04-13
      相关资源
      最近更新 更多