【问题标题】:Twisted read file asynchronously异步扭曲读取文件
【发布时间】:2014-10-11 12:18:45
【问题描述】:

我想异步读取一个大文件,一次 20 行,我们如何使用扭曲框架来做到这一点?

我的代码的 sn-p,但被阻塞:

with open(file_path) as f:
    if (importState.status == ImportStateFile.STATUS_AWAY):
        f.seek(importState.fileIndex, 0)

    while True:
        importState.fileIndex = importState.fileIndex + len(''.join(emails))
        d1 = get_emails(f)
        d1.addCallback(process_emails).addCallback(insert_emails_status)
        d1.addErrback(finalize_import)

【问题讨论】:

    标签: python asynchronous twisted


    【解决方案1】:

    使用 Twisted Producer and Consumer System,点击此链接了解更多信息:http://twistedmatrix.com/documents/current/core/howto/producers.html

    我的制作人:

    @implementer(IBodyProducer)
    class ListEmailProducer(FileBodyProducer):
    
        def _writeloop(self, consumer):
            """
            Return an iterator which reads one chunk of bytes from the input file
            and writes them to the consumer for each time it is iterated.
            """
            while True:
                emails = list(islice(self._inputFile, self._readSize))
                if len(emails) == 0:
                    self._inputFile.close()
                    consumer.finish()
                    break
    
                consumer.write(emails)
                yield None
    

    我的消费者:

    class ListEmailConsumer():
        producer = None
        finished = False
        unregistered = True
        importState = None
    
        def registerImportState(self, importState):
            self.importState = importState
    
        def registerProducer(self, producer):
            self.producer = producer
    
        def unregisterProducer(self):
            self.unregistered = True
    
        def finish(self):
            finalize_import(self.importState)
            reactor.callFromThread(reactor.stop)
    
        def write(self, emails):
            self.producer.pauseProducing()
            d = process_emails(emails, self.importState)
            d.addCallback(insert_emails_status, self.importState)
            d.addCallback(lambda ignored: self.producer.resumeProducing())
    

    执行:

    fileObj = open(file_path)
    
    listEmailProducer = ListEmailProducer(fileObj, readSize=20)
    listEmailConsumer = ListEmailConsumer()
    listEmailConsumer.registerProducer(listEmailProducer)
    listEmailConsumer.registerImportState(importState)
    listEmailProducer.startProducing(listEmailConsumer)
    

    【讨论】:

    • 请记住,这段代码中的读取仍然是同步和阻塞的。他们只是保持很小。通常这已经足够了,因为磁盘通常速度很快,并且一次只读取一点点可以保护您免受一次读取整个巨大文件的问题。但请记住,有时磁盘很慢(事实上,如果磁盘是竞争资源,则读取可能需要任意长的时间 - 就像网络操作一样)。
    • @jean-paul-calderone 你是对的,这段代码是阻塞的,我认为使用延迟队列很好,你觉得呢?
    • 要完全非阻塞,您需要一个非阻塞文件 I/O API(您需要替换 fileObj.read(或者,为了更准确地匹配您的示例,next(iter(fileObj)))使用不会阻塞的东西。Python 没有这样的 API。您可以使用线程,也可以绑定特定于平台的 API 来执行此操作(并希望它有效 - stackoverflow.com/questions/87892/…
    • 由于磁盘阻塞,我的代码中有另一部分被阻塞,当我从文件中读取 20 封电子邮件时,我执行 20 个延迟线程来检查每封电子邮件并等待结果然后插入这 20向 DB 发送电子邮件,如何增强此功能?而是等待我们可以有一个缓冲区,如果这个缓冲区提高了它的限制,假设 20 然后插入到 DB。
    • 对不起,你把我弄丢了。您的问题您的答案中没有线索。如果你想问一个新问题,可以问一个新问题。
    猜你喜欢
    • 1970-01-01
    • 2016-04-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-10
    • 2011-05-31
    • 1970-01-01
    相关资源
    最近更新 更多