【问题标题】:Fast way to read interleaved data?读取交错数据的快速方法?
【发布时间】:2011-05-12 19:05:57
【问题描述】:

我有一个包含多个数据通道的文件。文件以基本速率采样,每个通道都以该基本速率除以某个数字进行采样——它似乎总是 2 的幂,尽管我认为这并不重要。

所以,如果我有频道 abc,在 1、2 和 4 的分隔符处采样,我的流看起来像:

a0 b0 c0 a1 a2 b1 a3 a4 b2 c1 a5 ...

为了增加乐趣,通道可以独立地是浮点数或整数(尽管我知道每个通道),并且数据流不一定以 2 的幂结束:示例流无需进一步扩展即可有效。这些值有时很大,有时是小端,尽管我知道我要预先处理什么。

我有代码可以正确解包这些并用正确的值填充 numpy 数组,但它很慢:它看起来像(希望我没有掩饰太多;只是给出算法的想法):

for sample_num in range(total_samples):
    channels_to_sample = [ch for ch in all_channels if ch.samples_for(sample_num)]
    format_str = ... # build format string from channels_to_sample
    data = struct.unpack( my_file.read( ... ) ) # read and unpack the data
    # iterate over data tuple and put values in channels_to_sample
    for val, ch in zip(data, channels_to_sample):
        ch.data[sample_num / ch.divider] = val

而且速度很慢——在我的笔记本电脑上读取一个 20MB 的文件需要几秒钟。 Profiler 告诉我我在Channel#samples_for() 上花了很多时间——这是有道理的;那里有一点条件逻辑。

我的大脑感觉有一种方法可以一举完成,而不是嵌套循环——也许使用索引技巧将我想要的字节读入每个数组?构建一个庞大的、疯狂的格式字符串的想法似乎也是一条值得商榷的道路。

更新

感谢回复的人。值得一提的是,numpy 索引技巧将读取我的测试数据所需的时间从大约 10 秒减少到大约 0.2 秒,速度提高了 50 倍。

【问题讨论】:

  • 为什么有人要这样对你?
  • 如果 ch.samples_for 是问题所在,那么我们需要查看该函数才能知道如何为您提供帮助。顺便问一下,你有多少个频道?
  • 您现在是否实际上每次通过循环只读取一个频道?起初我以为不是这样,但现在我不清楚了。
  • 啊,这些文件来自一个嵌入式系统,该系统读取生理数据(心率、呼吸等)以及一些数字信号——我们在记录生理信号的同时进行功能性核磁共振和心理测试。这正是我们得到的。
  • 现在,我正在阅读,分组,(a0 b0 c0) (a1) (a2 b1) (a3) (a4 b2 c1) (a5)

标签: python optimization numpy binary-data


【解决方案1】:

真正提高性能的最佳方法是摆脱所有样本的 Python 循环,让 NumPy 在编译的 C 代码中执行此循环。这实现起来有点棘手,但这是可能的。

首先,您需要做一些准备。正如贾斯汀·皮尔(Justin Peel)所指出的,样本排列的模式在经过一些步骤后会重复。如果 d_1, ..., d_k 是您的 k 个数据流的除数,b_1, ..., b_k 是流的样本大小(以字节为单位),并且 lcm 是这些除数的最小公倍数,那么

N = lcm*sum(b_1/d_1+...+b_k/d_k)

将是流模式将在之后重复的字节数。如果您已经确定了前 N 个字节中的每个字节属于哪个流,则可以简单地重复此模式。

您现在可以通过类似于

的方式为前 N 个字节构建流索引数组
stream_index = []
for sample_num in range(lcm):
    stream_index += [i for i, ch in enumerate(all_channels)
                     if ch.samples_for(sample_num)]
repeat_count = [b[i] for i in stream_index]
stream_index = numpy.array(stream_index).repeat(repeat_count)

这里,d 是序列 d_1, ..., d_k,b 是序列 b_1, ..., b_k。

现在你可以做

data = numpy.fromfile(my_file, dtype=numpy.uint8).reshape(-1, N)
streams = [data[:,stream_index == i].ravel() for i in range(k)]

您可能需要在末尾稍微填充数据以使reshape() 工作。

现在,您将属于每个流的所有字节都放在了单独的 NumPy 数组中。您可以通过简单地分配给每个流的dtype 属性来重新解释数据。如果您希望将第一个流解释为大端整数,只需编写

streams[0].dtype = ">i"

这不会以任何方式改变数组中的数据,只会改变它的解释方式。

这可能看起来有点神秘,但在性能方面应该会更好。

【讨论】:

  • 我的流可能包含混合数据类型(int16 和 float64)——数据可能是一个字节数组。这种方法仍然可行吗?
  • 哦,我刚刚注意到我没有仔细阅读您的代码。我将更新我的答案以反映这一点......是的,仍然有可能......
  • 甜蜜。这看起来就是我想要的——不过,在它完全有意义之前,我需要做一些思考。我有点困惑:“前 N 个字节属于哪个流”——前 N 个字节属于任何一个流;保证 N 个字节的运行属于每个流。
  • 哦,这似乎太混乱了。英语不是我的第一语言,但我会努力改进解释......
  • 不用担心——真的,我应该试着把它编码,看看它会把我带到哪里。基本上,我会阅读整个内容,将其分成 N 个字节长的块,然后对其进行一些索引,对吗?
【解决方案2】:

channel.samples_for(sample_num) 替换为iter_channels(channels_config) 迭代器,该迭代器保留一些内部状态并让您一次性读取文件。像这样使用它:

for (chan, sample_data) in izip(iter_channels(), data):
    decoded_data = chan.decode(sample_data)

要实现迭代器,请考虑一个周期为 1 的基本时钟。各个通道的周期是整数。按顺序迭代通道,如果时钟模其周期为零,则发出通道。

for i in itertools.count():
    for chan in channels:
        if i % chan.period == 0:
            yield chan

【讨论】:

  • 所以,在这个解决方案中,我不会计算到 total_samples (这是基本速率下的样本总数),而是计算到文件中我期望的样本总数 -所有通道的长度之和。对吗?
  • 时钟可以计数到无穷大,这就是 itertools.count() 所做的。 izip 会在文件用完时丢弃它。
  • 我不能依赖文件结尾——在我感兴趣的数据结束后经常会有废话。但我可以计算出“我想读的东西”的总数和数到这一点。
【解决方案3】:

grouper() recipeitertools.izip() 在这里应该会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-05-17
    • 2013-01-17
    • 2011-01-11
    • 1970-01-01
    • 1970-01-01
    • 2013-08-19
    • 2018-10-04
    • 1970-01-01
    相关资源
    最近更新 更多