【问题标题】:Lazy Deferred List reaching maximum recursion depth延迟延迟列表达到最大递归深度
【发布时间】:2013-03-15 14:23:15
【问题描述】:

我有大量文档要插入到 MongoDB 中(可能 n > 100000)。我不想一次创建 100000 个延迟,但我不想按顺序执行和等待每个查询,因为我有一个到 MongoDB 的连接池并且我想充分利用它。所以我有一个生成器函数,它会产生被DeferredLazyList 消耗的延迟。

def generate_update_deferreds(collection, many_docs):
    for doc in many_docs:
        d = collection.update({'_id': doc['_id']}, doc, upsert=True)
        yield d

这是链接延迟 upsert 的生成和 DeferredLazyList 的代码。

@defer.inlineCallbacks
def update_docs(collection, many_docs):
    gen_deferreds = generate_update_deferreds(collection, many_docs)
    results = yield DeferredLazyList(gen_deferreds, count=pool_size, consume_errors=True)

DeferredLazyListDeferredList 相似,但它接受一个迭代器,而不是接受一个等待的延迟列表。延迟从迭代器中检索,同时只有count 延迟激活。这用于有效地批处理延迟,因为它们是在生成时创建的。

class DeferredLazyList(defer.Deferred):
    """
    The ``DeferredLazyList`` class is used for collecting the results of
    many deferreds. This is similar to ``DeferredList``
    (``twisted.internet.defer.DeferredList``) but works with an iterator
    yielding deferreds. This will only maintain a certain number of
    deferreds simultaneously. Once one of the deferreds finishes, another
    will be obtained from the iterator.
    """

    def __init__(self, deferreds, count=None, consume_errors=None):
        defer.Deferred.__init__(self)

        if count is None:
            count = 1

        self.__consume_errors = bool(consume_errors)

        self.__iter = enumerate(deferreds)
        self.__results = []
        for _i in xrange(count):
            # Start specified number of simultaneous deferreds.
            if not self.called:
                self.__next_save_result(None, None, None)
            else:
                break

    def __next_save_result(self, result, success, index):
        """
        Called when a deferred completes.
        """
        # Make sure we can save result at index.
        if index is not None:
            results_len = len(self.__results)
            if results_len <= index:
                self.__results += [NO_RESULT] * (index - results_len + 1)
            # Save result.
            self.__results[index] = (success, result)

        # Get next deferred.
        try:
            i, d = self.__iter.next()
            d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))

        except StopIteration:
            # Iterator is exhausted, callback self with results.
            self.callback(self.__results)

        # Pass through result.
        return result if success or not self.__consume_errors else None

问题是当延迟从generate_update_deferreds() 产生时,它们的.called 已经设置为True,这导致DeferredLazyList 递归调用自身。

发生了什么:

  1. DeferredLazyList.__init__() 中,self.__next_save_result() 被称为 count 次(比如 5 次)。

  2. self.__next_save_result() 的每次调用都会消耗从self.__iter 延迟的1 个,并将其自身作为回调添加。

  3. 因为产生的 deferred 将 .called 设置为 True,所以 d.addCallbacks(self.__next_save_result, ...) 立即调用 self.__next_save_result() 并且此循环将继续直到引发 RuntimeError,因为已达到递归深度。

我在达到递归限制之前打印了一个堆栈跟踪,以确认这是问题的原因:

File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/server.py", line 937, in update_many_docs
    results = yield DeferredLazyList(gen_deferreds, count=self.mongo_connections, consume_errors=True, return_results=True)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 157, in __init__
    self.__next_save_result(None, None, None)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
    self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 290, in addCallbacks
    self._runCallbacks()
File "/usr/lib/python2.7/dist-packages/twisted/internet/defer.py", line 551, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
File "/home/caleb/it/Development/projects/python/amazon/bin/feeds-daemon/lib/twisted.py", line 222, in __next_save_result
    d.addCallbacks(self.__next_save_result, self.__next_save_result, callbackArgs=(True, i), errbackArgs=(False, i))
# Repeated until the RuntimeError
exceptions.RuntimeError: maximum recursion depth exceeded

任何帮助将不胜感激。顺便说一句,我正在运行带有 Twisted 12.1.0 的 Python 2.7.3,而 MongoDB 的内容实际上只与理解上下文相关。


我想要每个 deferred 的结果,但 cooperate() 不返回这些结果,所以我在将它们交给 CooperativeTasks 之前为每个 deferred 添加了一个回调:

from twisted.internet.defer import DeferredList, inlineCallbacks
from twisted.internet.task import cooperate

NO_RESULT = object()

def generate_update_deferreds(collection, many_docs, save_results):
    for i, doc in enumerate(update_docs):
        d = collection.update({'_id': doc['_id']}, doc, upsert=True)
        d.addBoth(save_result, i, save_results) # Save result
        yield d

def save_result(result, i, save_results):
    save_results[i] = result

@inlineCallbacks
def update_docs(collection, many_docs):
    save_results = [NO_RESULT] * len(many_docs)
    gen_deferreds = generate_update_deferreds(collection, many_docs, save_results))
    workers = [cooperate(gen_deferreds).whenDone() for _i in xrange(count)]
    yield defer.DeferredList(workers)
    # Handle save_results...

【问题讨论】:

    标签: python twisted twisted.internet


    【解决方案1】:

    Twisted 中有一些工具可以帮助您更轻松地完成此操作。例如合作:

    from twisted.internet.task import cooperate
    
    def generate_update_deferreds(collection, many_docs):
        for doc in update_docs:
            d = collection.update({'_id': doc['_id']}, doc, upsert=True)
            yield d
    
    work = generate_update_deferreds(...)
    worker_tasks = []
    for i in range(count):
        task = cooperate(work)
        worker_tasks.append(task)
    
    all_done_deferred = DeferredList([task.whenDone() for task in worker_tasks])
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-12-06
      • 1970-01-01
      • 1970-01-01
      • 2014-11-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多