【问题标题】:Scrapy async item pipelineScrapy 异步项目管道
【发布时间】:2025-11-22 08:45:01
【问题描述】:

我正在编写一个项目管道,该管道使用 Pika 将项目拖放到 RabbitMQ 队列中。目前我正在使用阻塞/同步方法,这显然不是一个好主意。我想使用类似于this 的异步方法,但我不知道如何将延迟任务添加到已经运行的反应器中。例如 pika 连接的扭曲版本的示例代码显示如下:

d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run() # Problem is, it's already running

有人对如何将延迟任务添加到已经启动的反应器有任何指示吗?

【问题讨论】:

  • 如果你只是删除 reactor.run() 部分会发生什么,我的直觉说它会起作用,因为运行反应已经到位,你试过了吗?
  • @brunsgaard 是的,我试过了,但没有任何反应。我将通读scrapy的源代码,看看是否能找到任何线索。我有一种感觉,我需要以某种方式返回延期的 (d)
  • 我不关注.. 如果我是你,我会从管道中的 gist.github.com/brunsgaard/b11ca0b2023f66f2fea5 这样简单的东西开始,只是为了确保反应器正在运行并且延迟被触发。 (代码取自关于 deferreds 的扭曲教程)
  • 对我来说似乎它只是有效,其他人正在这样做。 github.com/darkrho/dirbot-mysql/blob/…
  • 我遵循了你的要点并且它起作用了,但只有当我在 open_spider 方法的末尾添加了一个'return d'语句时。见gist.github.com/anonymous/e04807c5352c27053f38 ...这对你有意义吗?

标签: scrapy twisted


【解决方案1】:

通常,您可以在反应器启动后使用任何 Twisted API(有一些值得注意的例外,例如 reactor.run ;)。启动一些异步操作的 Twisted API 很乐意针对正在运行的反应器(而不是尚未启动的反应器)这样做。他们也很乐意针对尚未启动的反应堆这样做(尽管在反应堆启动之前不会取得任何进展)。

如果您有一些代码在运行反应器的同一线程中运行,您可以只使用 Twisted API(例如您的示例似乎使用的ClientCreator.connectTCP;不过,请单独查看新的“端点" API,它们基本上在各方面都比ClientCreator 好)而且它们会正常工作。

如果他们不能正常工作,问题可能是他们没有做你期望他们做的事情,他们报告了一个你忽略的错误(正如问题中的示例似乎所做的那样) ,或者代码实际上并没有在反应堆所在的线程中运行。

【讨论】:

  • Jean-Paul,您关于线程的最后一点被证明是正确的,正如 brunsgaard 在我的问题中指出的那样。感谢你们俩的帮助