【问题标题】:Process Celery Tasks results in arrival order处理 Celery 任务导致到达顺序
【发布时间】:2023-04-07 18:08:01
【问题描述】:

我是 celery 新手,我有一个“celery-server”,就像下面的代码一样,它会在一段时间后返回结果,具体取决于计算。我在下面这个简单的程序中使用 sleep 功能模拟了这种行为。我想要的是在“重结果”之前处理早期返回的结果。我写了一个简单的程序,见下面的片段,它故意创建“重载”任务作为第一次调用。 请注意,随后的调用会创建“更轻”的任务,因此 celery 服务器会更早地返回它们。因此,我想根据它们到达客户端的顺序来处理返回的结果。现在(参见客户端代码)它一直等到繁重的任务返回。

但是对于 celery 文档中的示例,我应该通过检查 id 来等待结果,或者轮询它们(这很愚蠢,因为 celery 客户端必须以某种方式检查“第一个”到达结果的 id 我猜)。

如何按到达客户的顺序处理 celery 的结果?我不想在无休止的循环中轮询“result.ready()”,因为这完全搞砸了恕我直言,不知何故异步处理的感觉。

在文档中找不到解决方案。我想要做的是“获得第一个到达的结果并获得 id”,将其与我的“result.id”(我是否发送了任务?)进行比较,然后进行相应的处理。

#
# Name this code "tasks.py"  and run it with:
# celery worker -A tasks  --loglevel=info
#
from celery import Celery
import time
app = Celery('tasks', backend='amqp', broker='amqp://guest:guest@127.0.0.1:5672/%2F')

@app.task()
def add(x,y):
  print("x=%s y=%s" % (x,y))
  time.sleep(x)
  return x + y

对客户端进行第二次编程:这就像 celery 文档一样工作,但是 celery 已经完成了 0,1,2(因此客户端应该对其进行处理)。

#!/usr/bin/python3
from tasks import add

results = []
max = 4
for i in range(0,max):
  print(max-(i+1))
  result = add.delay(max-(i+1),0)
  results.append(result)

print("") 

for i in range(0,max):
  result = results[i].get(timeout=10)
  print(result)

结果:(最后 4 个数字应按到达顺序出现,即 0,1,2,3)

3
2
1
0

3
2
1
0

【问题讨论】:

    标签: python rabbitmq celery amqp


    【解决方案1】:

    您应该实现一个回调,而不是按照发送到队列的顺序循环遍历结果:

    http://celery.readthedocs.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

    在tasks.py中:

    @app.task()
    def process_add(result):
        print(result)
    

    在client.py中:

    from tasks import add, process_add
    results = []
    max = 4
    for i in range(0,max):
        print(max-(i+1))
        add.apply_async((max-(i+1), 0), link=process_add.s())
    

    【讨论】:

    • 将其更改为: result = add.apply_async((max-(i+1), 0), link=process_at_client.s() ) 它只是在服务器端给我一个错误,该 process_at_client 不存在。似乎在文档之后,这只是服务器端的“链接”功能?
    • 是的,对不起。您需要将它添加到您的 tasks.py 并添加任务装饰器。也无需检查结果。上面更正了。
    • 现在没有语法错误。但是 process_add 仍然在服务器端而不是在接收客户端执行,它仍然需要使用 resutls.get() 拉结果:我可以看到任务是在服务器端链接的,所以在添加 on 服务器,process_add 被称为 on 服务器。但实际上我希望这部分在客户端上运行... :-( [2015-09-11 08:43:21,096: INFO/MainProcess] Task tasks.add[fd1d64ad-3d02-4a9b-9a38-0b2fd0511eaf] 在 0.024234951997641474 中成功s: 0 [2015-09-11 08:43:21,104: INFO/MainProcess] 任务 tasks.process_add[d89b6cef-155e-4c18-bf72-ccdd3bb69b1c] succ
    • 听起来您实际上也在寻找客户端中的异步行为。您需要一种方法让工作人员在处理完作业后异步触发调用者。这听起来很像另一个芹菜队列,只是这一次工人是“客户”。
    • 我知道你要去哪里,这需要大量工作才能使客户端“后处理”任务成为服务器上的芹菜任务,并在客户端和芹菜服务器之间进行大量信息传输。叹息 - 不是一个选择
    猜你喜欢
    • 1970-01-01
    • 2018-06-03
    • 1970-01-01
    • 1970-01-01
    • 2012-06-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多