【问题标题】:celery - call function on task donecelery - 完成任务时调用函数
【发布时间】:2012-03-06 00:20:24
【问题描述】:

我将 celery 与 django 和 rabbitmq 一起使用来创建消息队列。我还有一个工人,它来自不同的机器。在 django 视图中,我正在启动这样的过程:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")

我的任务是这样配置的:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out

我现在的问题是代理(我的 django 项目)现在如何接收工作人员在其计算机上执行的“ls -l”命令的输出。我想最好的办法是工人在准备好发送已执行命令的输出时调用代理中的函数。

我想异步接收worker的输出,然后用输出更新网页,但那是另一次了。现在我只想接收工人的输出。

更新

现在我添加了一个在任务结束时触发的 HTTP GET 请求,通知 Web 应用程序任务已完成 - 我还在 http GET 中发送 task_id。 http GET方法调用django视图,创建AsyncResult并获取结果,但问题是调用result.get()时出现如下错误:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"

任何想法为什么?我没有使用数据库,因为我使用的是带有 AMQP 的 rabbitmq。

更新。

我非常想使用第三个选项,这似乎是最好的选择 - 用于小返回值和大返回值。我的整个任务如下所示:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r

所以我重写了 after_return 函数,它也应该释放我的任务的锁,因为任务的 run() 函数已经返回了一个值。在 HEAD 请求中,我基本上是在调用一个 django 函数,它在 task_id 上调用 AsyncResult,它应该提供任务的结果。在我的案例中,我使用任意结果进行测试,因为它仅用于测试。

我想知道为什么上面的代码不起作用。我可以使用 on_success,但我认为它不会产生影响 - 还是会?

【问题讨论】:

  • 你能把命令的输出保存在数据库中吗?
  • 您好,不,因为工作人员无权访问经纪人的数据库,我也不希望他们有权访问。我肯定需要发回一个结果,然后在代理中处理它。
  • 也许您可以创建一个 HTTP API 来发回结果?在 Django 中有一些非常简单的方法可以做到这一点。
  • 是的,我进行了一个 HTTP GET 调用,它发回了一个 ID。然后 Web 应用程序应该只读取任务的输出,但它不起作用 - 我已经用失败的结果更新了我的问题。
  • 我不明白你在做什么 - 你没有发布你的代码。但是,如果要使用 URL 来存储结果,那么它绝对应该在 GET 上,这将违反 RFC2616。考虑 POST。

标签: python django rabbitmq celery


【解决方案1】:

如果您查看here,您会发现以下内容:

Django-celery 使用 MySQL 来跟踪所有任务/结果,rabbit-mq 基本上用作通信总线。

真正发生的事情是,您在任务仍在运行时尝试获取工作人员的ASyncResult(该任务向您的服务器调用了一个 HTTP 请求,并且由于它尚未返回,因此数据库锁定会话从工作人员仍然处于活动状态,结果行仍然被锁定)。当 Django 尝试读取任务结果(其状态和 run 函数的实际返回值)时,它会发现行被锁定并发出警告。

有几种方法可以解决这个问题:

  1. 设置另一个 celery 任务以获取结果并将其链接到您的处理任务。这样原始任务将完成,释放 db 上的锁,新任务将获取它,在 django 中读取结果并执行您需要执行的任何操作。在这方面查找 celery 文档。

  2. 完全不用费心,只需对 Django 进行 POST,并将完整处理结果作为有效负载附加,而不是尝试通过 db 获取它。

  3. 在您的任务类中覆盖 on_success 并将您的通知请求发布到 Django,然后应该在 db 表上释放锁。

请注意,您需要将整个处理结果(无论它有多大)存储在 run 方法的返回中(可能是腌制的)。你没有提到结果有多大,所以实际上只做上面的场景 #2 可能是有意义的(这就是我要做的)。或者我会选择#3。也不要忘记在你的任务中处理 on_failure 方法。

【讨论】:

  • 感谢您的评论。我已经更新了我的答案以提出其他问题,在接受你的答案之前我需要回答这个问题,顺便说一句,这真的很好。