【问题标题】:Create file-like object for data buffer为数据缓冲区创建类似文件的对象
【发布时间】:2018-08-22 17:01:01
【问题描述】:

情境化

我正在编写一个程序,该程序能够从传感器读取数据,然后对其进行处理。目前我希望它被发送到服务器。我有两个通过套接字进行通信的进程,一个读取数据并将其存储到临时文件,另一个读取临时文件,将数据发送到服务器。

问题

这个问题实际上从未在测试中出现,但是我意识到如果采样频率很高,两个进程很有可能同时尝试读/写文件(不是他们完全同时请求它,而是一个尝试在另一个关闭它之前打开它)。

即使这不会引发错误(对于我在线阅读的内容,某些操作系统不会将锁放入文件中),它也可能导致巨大的版本不兼容错误,从而导致数据丢失。因此,这种处理数据的方式看起来不太合适。

我自己的想法/方法

我想在内存(数据缓冲区)中使用类似文件的对象。我对 Python 中的这个概念没有经验,所以我进行了一些研究,我了解到 [缓冲区] 就像一个文件,它在程序执行时保存在内存中,并且具有与标准系统非常相似的属性文件。我认为使用它可能是个好主意,但是我找不到解决其中一些不便的方法:

  1. 既然它仍然是 like 一个文件(类文件对象),如果两个进程在对象上的操作一致,会不会出现版本不兼容错误/错误?我只需要追加数据与一个进程(在最后)和从一开始就与另一个进程删除数据(作为某种队列)。这个 Python 功能是否允许这样做,如果允许,我可以在文档中查看哪些方法?

  2. 对于上面的解释,我考虑过使用队列;然而,这在时间方面可能是低效的执行(附加到列表相当快,但根据我在自己的机器上进行的测试以查看哪种对象类型最适合,附加到 pandas 对象的速度要慢大约 1000 倍)。是否有一个对象(如果不是类似文件的对象)可以让我这样做并且高效?我知道效率是主观的,所以假设每秒 100 次追加,没有明显的延迟(在这种情况下时间戳很重要)。

  3. 由于我使用了两个不同的进程并且它们在 Python 中不共享内存,在对类文件对象进行操作时是否仍然可以指向相同的内存地址?正如我所说,我用套接字与它们通信,但该方法是 afaik 按值调用,而不是引用;所以这对我来说似乎是一个严重的问题(也许有必要将它们合并到两个线程而不是不同的 python 进程中?)

如果需要,您可以评论询问任何其他细节,我将很乐意回答。

编辑:在 cmets 中提出的问题:

您是如何创建这些流程的?通过一个 Python 模块,比如 multiprocessingsubprocess,还是其他方式?

我将它们作为两个完全独立的程序运行。每个都有一个由 shell 脚本调用的不同的主要 python 文件;但是,如果需要,我可以灵活地更改此行为。

另一方面,从传感器读取数据的进程有两个线程:一个是从字面上读取数据,另一个是侦听套接字请求。

您从传感器获取什么类型的数据并将其发送到 服务器?

我发送的表格通常包含浮点数,但传感器也可能产生视频流或其他类型的数据结构。

对队列的误解 |熊猫

我知道队列与数据帧无关;我只是说我尝试使用数据帧但它表现不佳,因为它被认为预先分配了它需要的内存空间(如果我是对的)。我只是表达了我对解决方案性能的担忧。

【问题讨论】:

  • 您是如何创建这些流程的?通过 multiprocessingsubprocess 之类的 Python 模块,或其他方式?
  • 您需要当前使用的磁盘文件吗?您考虑并拒绝使用队列,因为附加到 pandas 对象的效率低下:这不是很清楚 - 您从传感器获取什么类型的数据并将其发送到服务器?
  • 顺便说一句,您误解了什么是类文件对象。类文件对象是提供类似于文件的 Python 接口的任何对象(通常是 readwrite 和逐行迭代等方法),无论对象背后的底层机制如何。它可以是像io.StringIO 这样的内存存储,也可以是普通文件,也可以是管道或套接字周围的包装器或许多其他东西。
  • @user2357112 谢谢你的解释:)。我当然误解了什么是类文件对象

标签: python python-3.x buffer


【解决方案1】:

首先,您确实正在考虑构建 io.BytesIO 已经在做的事情。它是一个完全存储在内存中的类似文件的对象。每个进程的对象完全独立于其他进程的对象。它就是你想要的一切。但这对你没有任何好处。它是一个类似文件的对象这一事实并不意味着它可以从其他进程中访问。事实上,这就是不是文件的类文件对象的全部:它们不是文件。

但您可以明确锁定您的文件。

确实,除了 Windows,大多数操作系统都不会自动锁定文件,有些甚至没有“强制”锁定,只有协同锁定,除非所有程序都编写好,否则不会真正保护文件使用锁。但这不是问题。

一种选择是为 Windows 和 Unix 编写单独的代码:在 Windows 上,依赖于以独占模式打开文件;在 Unix 上,使用flock

另一个选项是创建手动锁定文件。您可以自动尝试创建一个文件,如果其他人首先在每个平台上创建文件,只需使用带有O_CREAT|O_EXCL 标志的os.open,您就可以在此基础上构建您需要的所有其他内容。


如果您正在考虑使用共享内存,除非您使用的是 multiprocessing,否则以跨平台的方式这样做是相当痛苦的。

但是你可以通过使用普通文件并在每个进程中使用mmap来访问文件,就像访问普通内存一样,可以获得相同的效果。只要确保只使用 lengthaccess 的跨平台值(而不是使用特定于平台的参数,如 protflags),它在任何地方都以相同的方式工作。

当然,您不能将 Python 对象放入共享内存或 mmap,但您可以放入原始字节、“本机值”或它们的数组,或 ctypes Structures,或者,最好的所有,它们的多维numpy数组。除了最后一个,您可以使用the appropriate wrapper objects out of multiprocessing,即使您没有以其他方式使用该模块。对于最后一个,只需使用np.memmap 而不是直接使用mmap,它会处理所有事情。


但是,队列速度更快,您可能是对的。如果这真的是一个问题(尽管我实际上会在解决它之前构建并测试它是否是一个问题......),然后去做。但是您似乎对此有一些误解。

首先,我不知道您为什么认为队列与附加到 pandas DataFrames 有任何关系。我想您可以使用 df 作为队列,但两者之间没有内在联系。

同时,列表适合小队列,但对于非常大的队列则不然。要么附加到右侧并从左侧弹出,要么附加到左侧并从右侧弹出。无论哪种方式,左侧的操作都需要与队列大小成线性关系的时间,因为您必须将列表的其余部分向左或向右移动一个插槽。解决这个问题的方法是collections.deque,这个对象几乎和列表一样,只是它可以在两侧进行恒定时间的插入或删除,而不是右侧。

但同样,这并不能解决任何问题,因为它实际上并没有以任何方式共享。您需要某种 interprocess 队列,而 DataFrame 和列表(或双端队列)都无济于事。

您可以在管道之上构建一个进程间队列。根据您的进程的运行方式,这可能是一个匿名管道,其中启动程序将管道的一端交给子程序,或者这可能是一个命名管道,这在 Windows 和 Unix 上略有不同,但在在这两种情况下,这两个程序都可以使用一些全球知名的名称(如文件系统路径)来打开同一个管道。

您还可以在 TCP 套接字之上构建进程间队列。如果绑定到localhost,连接到localhost,这几乎和管道一样高效,但是跨平台编写更简单。

那么,如何在管道或套接字之上构建队列?唯一的问题是您只有一个字节流而不是消息流。

  • 如果您的消息大小相同,您只需将sendall 放在一侧,然后将recv 循环,直到有MESSAGESIZE 字节。
  • 如果它们是像pickle这样的自定界格式,没有问题;只是sendall 在一侧,recv 直到你在另一侧有一个完整的泡菜。您甚至可以使用socket.makefile(当然,仅适用于套接字,而不是管道)来获取可以直接传递给pickle.dump‘ andpickle.load 的类似文件的对象。
  • 您可以使用某种分隔符(例如,如果您的消息是永远不能包含换行符或永远不能包含 NUL 字节的文本,您可以只使用换行符或 0 作为分隔符 - 如果您使用换行符, makefile 再次为您解决此问题。
  • 或者您可以在消息本身之前发送每条消息的大小(例如,使用像 netstring 这样的普通协议)。

如果您正在(或可能)使用multiprocessing 库来控制所有单独的进程,它带有一个内置的Queue 类,它在通过管道发送泡菜的基础上构建一个IPC 队列适用于每个主要平台的有效方式,但您不必担心它是如何工作的;您只需将队列交给您的子进程,您可以在一端使用put,在另一端使用get,这样就可以了。

【讨论】:

  • 您似乎没有解决问题的整个进程间通信方面。 collections.deque 作为进程间通信通道没有意义。
  • @user2357112 啊,他花了这么多时间谈论防止任何我没有意识到他需要一些进程间共享,只是需要保护它。我会删除不相关的部分。
  • 对不起,如果我没有非常恰当地起草问题。有很多想法需要考虑。至于你的回复,我看到我有两个主要选择:要么只使用一个进程并使用一个队列,要么实现某种文件锁定机制。如果我使用multiprocessing 运行它们,进程是否可以共享内存?
  • @J.C.Rocamonde 不,您可以使用多个进程并构建 IPC 队列,或者使用多个进程并锁定您的文件。
  • @J.C.Rocamonde 如果您可以根据子进程(或它们的池)重新考虑您的程序,您可以使用multiprocessing 来简化几乎所有这些解决方案。这实际上不是必需,但它可能会让您的生活更轻松。
【解决方案2】:

您误解了什么是类文件对象。 “类文件对象”描述了对象呈现的接口——方法如readwrite,以及逐行迭代。它没有说明它是否将数据存储在内存中。常规文件对象是类文件对象。操作系统级管道的文件对象是类文件对象。 io.StringIOio.BytesIO 对象是类似文件的对象,它们实际上确实像您想的那样工作。

与其考虑类似文件的对象,不如考虑一下要使用什么操作系统级别的机制在进程之间进行通信。你已经有了插座;为什么不使用套接字在进程之间发送数据?管道将是另一种选择。共享内存是可能的,但依赖于平台且很棘手;这可能不是最好的选择。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多