【问题标题】:Read file line by line with asyncio使用 asyncio 逐行读取文件
【发布时间】:2015-11-20 10:30:03
【问题描述】:

我希望在写入多个日志文件时读取它们并使用 asyncio 处理它们的输入。代码必须在 Windows 上运行。根据我对 stackoverflow 和网络搜索的了解,异步文件 I/O 在大多数操作系统上都很棘手(例如,select 将无法按预期工作)。虽然我确信我可以使用其他方法(例如线程)来做到这一点,但我会尝试使用 asyncio 来看看它是什么样的。最有用的答案可能是描述该问题的解决方案的“架构”应该是什么样的,即应该如何调用或调度不同的函数和协程。

下面给了我一个生成器,它逐行读取文件(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

要监视和处理多个文件,这种代码需要线程。我已经稍微修改了它,以便在 asyncio 中更有用:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

当我通过 asyncio 事件循环安排它时,这种工作有效,但如果 process_data 阻塞,那当然不好。刚开始时,我想象解决方案看起来像

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

但我不知道如何使它工作(至少在没有 process_data 管理相当多的状态的情况下是这样)。

关于我应该如何构建这种代码的任何想法?

【问题讨论】:

  • 我已经测试了代码的最顶层版本,它能够读取文件的更改。

标签: python python-asyncio


【解决方案1】:

使用aiofiles

async with aiofiles.open('filename', mode='r') as f:
    async for line in f:
        print(line)

编辑 1

正如@Jashandeep 提到的,你应该关心阻塞操作:

另一种方法是select 和或epoll

from select import select

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)

timeout 参数在这里很重要。

见:https://docs.python.org/3/library/select.html#select.select

编辑 2

您可以使用以下方式注册文件以进行读/写:loop.add_reader()

它在循环内部使用内部 EPOLL 处理程序。

编辑 3

但请记住,Epoll 不适用于常规文件。

【讨论】:

    【解决方案2】:

    根据我在 stackoverflow 和 Web 上的搜索结果,异步文件 I/O 在大多数操作系统上都很棘手(例如,select 无法按预期工作)。虽然我确信我可以使用其他方法(例如线程)来做到这一点,但我会尝试使用 asyncio 来看看它是什么样的。

    asyncio select 在底层基于 *nix 系统,因此您将无法在不使用线程的情况下进行非阻塞文件 I/O。在 Windows 上,asyncio 可以使用 IOCP,它支持非阻塞文件 I/O,但 asyncio 不支持。

    您的代码很好,除了您应该在线程中阻塞 I/O 调用,这样如果 I/O 很慢,您就不会阻塞事件循环。幸运的是,使用 loop.run_in_executor 函数将工作负载转移到线程非常简单。

    首先,为您的 I/O 设置一个专用线程池:

    from concurrent.futures import ThreadPoolExecutor
    io_pool_exc = ThreadPoolExecutor()
    

    然后简单地将任何阻塞 I/O 调用卸载到执行器:

    ...
    line = yield from loop.run_in_executor(io_pool_exc, f.readline)
    ...
    

    【讨论】:

      【解决方案3】:

      您的代码结构对我来说看起来不错,以下代码在我的机器上运行良好:

      import asyncio
      
      PERIOD = 0.5
      
      @asyncio.coroutine
      def readline(f):
          while True:
              data = f.readline()
              if data:
                  return data
              yield from asyncio.sleep(PERIOD)
      
      @asyncio.coroutine
      def test():
          with open('test.txt') as f:
              while True:
                  line = yield from readline(f)
                  print('Got: {!r}'.format(line))
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(test())
      

      【讨论】:

      • 看起来不错!我错过了我应该在您命名为test 的函数中打开文件,这解决了我的头痛问题。谢谢!
      • 我认为这有点误导 - 你实际上并没有卸载阅读本身。您只是在异步执行sleep。 IIUC。
      • @guyarad 你说得对。这个答案只是关于修复 OP 的代码。也许我会删除它。
      【解决方案4】:

      asyncio还不支持文件操作,抱歉。

      因此它无法帮助您解决问题。

      【讨论】:

      • 是的,但为了更具建设性,我可以批准该方法:loop.add_reader(在循环内共享 EPOLL 处理程序)足以在不阻塞整个线程的情况下逐块读取文件并实现最大性能,所以我不同意你的观点,Python 支持以异步方式读取文件,但有一些限制。
      • EPOLL 不支持 Linux 上的常规文件
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-09-11
      • 2019-03-18
      • 1970-01-01
      • 1970-01-01
      • 2014-06-21
      相关资源
      最近更新 更多