【问题标题】:Twisted: Waiting for subtasks to finishTwisted:等待子任务完成
【发布时间】:2013-12-18 15:27:15
【问题描述】:

在我的代码中,我有两个假设的任务:一个从生成器获取 url 并使用 Twisted 的合作器批量下载它们,另一个获取下载的源并异步解析它。我正在尝试将所有获取和解析任务封装到单个 Deferred 对象中,该对象在下载所有页面并解析所有源时回调。

我想出了以下解决方案:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

代码有效,但我觉得好像我遗漏了一些明显的东西,或者不知道一个简单的 Twisted 模式会使这变得更简单。有没有更好的方法来返回一个在所有获取和解析完成时回调的 Deferred?

【问题讨论】:

  • Undefined name 'parse'Undefined name 'get_urls'Undefined name 'task_finished'。如果问题中的示例代码实际运行,则确保问题的答案正确会容易得多:)。

标签: python asynchronous callback twisted deferred


【解决方案1】:

按照目前的编写,在我看来,此代码的并行下载数量有限,但并行解析作业数量不受限制。这是故意的吗?我将假设“不”,因为如果您的网络碰巧很快而您的解析器碰巧很慢,因为 URL 的数量接近无穷大,那么您的内存使用量也是如此:)。

所以这是一个并行性有限但会随着下载顺序执行解析的东西,而不是:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

这是因为parse(显然)返回一个Deferred,将其作为回调添加到getPage 返回的回调会导致Deferred 不会调用coiterate 添加的回调直到parse 完成其业务。

由于您询问的是惯用的 Twisted 代码,我还冒昧地对其进行了一些现代化改造(使用 task.react 而不是手动运行反应器,内联表达式以使事情更简洁等等)。

如果您确实希望获得比并行提取更多的并行解析,那么这样的方法可能会更好:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

您可以看到parseWhenReady 返回从acquire 返回的Deferred,因此一旦并行解析可以开始,并行提取就会继续,因此您不会继续提取即使在解析器超载时也不加选择。但是,parallelParse 谨慎地避免返回由parserelease 返回的Deferred,因为在这些过程中应该能够继续获取。

(请注意,由于您的初始示例无法运行,因此我根本没有测试过其中任何一个。希望即使存在错误,意图也很明确。)

【讨论】:

  • 我很抱歉代码无法运行并且我的问题不够清晰。因为这个问题纯粹是理论上的,所以我现在可以无限并发parse。由于main_task 将与blockingCallFromThread 一起运行,因此它需要返回一个Deferred,它会在下载所有页面并解析所有源代码时回调,而不使用hackish 状态变量。我编辑了问题以包含此内容。
  • 如果你特别想去掉parse的并发限制,只需使用第一个例子,但用yield getPage(url). addCallback(lambda x: parse(x) and None)而不是yield getPage(url).addCallback(parse),这样parse中的Deferred就是显式丢弃而不是链接。这个答案是否解决了您的用例?
  • 这回答了这个问题。另外,感谢您在 Twisted 上所做的工作。这是我使用 Python 进行网络编程的唯一原因。 :]
  • +1 用于展示DeferredSemaphore 如何有用的简单示例。如果parse()s 花费的时间太长,那么 所有 url 获取停止 是否正确,即所有内容都挂在 parseSemaphore.acquire() 上,等待至少一个 parse() 完成?
猜你喜欢
  • 2016-01-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多