【问题标题】:Fast extraction of chunks of lines from large CSV file从大型 CSV 文件中快速提取行块
【发布时间】:2016-12-19 10:48:08
【问题描述】:

我有一个大的 CSV 文件,其中包含格式如下的股票相关数据:

股票代码、日期、[一些变量...]

因此,每一行都以符号(如“AMZN”)开头,然后是日期,然后是与所选日期的价格或交易量相关的 12 个变量。该文件中代表了大约 10,000 种不同的证券,我每天都有一条线表示股票已公开交易。该文件首先按股票代码的字母顺序排序,然后按时间顺序按日期排序。整个文件大约 3.3 GB。

我想要解决的任务是能够提取给定股票代码相对于当前日期的最新 n 行数据。我有执行此操作的代码,但根据我的观察,每次检索似乎平均需要大约 8-10 秒(所有测试都提取了 100 行)。

我有一些我想运行的功能需要我为成百上千个符号抓取这样的块,我真的很想减少时间。我的代码效率低下,但我不知道如何让它运行得更快。

首先,我有一个名为 getData 的函数:

def getData(symbol, filename):
  out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
         "Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
  l = len(symbol)
  beforeMatch = True
  with open(filename, 'r') as f:
    for line in f:
        match = checkMatch(symbol, l, line)
        if beforeMatch and match:
            beforeMatch = False
            out.append(formatLineData(line[:-1].split(",")))
        elif not beforeMatch and match:
            out.append(formatLineData(line[:-1].split(",")))
        elif not beforeMatch and not match:
            break
  return out

(这段代码有几个辅助函数,checkMatch 和 formatLineData,我将在下面展示。)然后,还有另一个名为 getDataColumn 的函数可以获取我想要的列,并表示正确的天数:

def getDataColumn(symbol, col=12, numDays=100, changeRateTransform=False):
  dataset = getData(symbol)
  if not changeRateTransform:
    column = [day[col] for day in dataset[-numDays:]]
  else:
    n = len(dataset)
    column = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
  return column

(如果为真,changeRateTransform 将原始数字转换为每日变化率数字。)辅助函数:

def checkMatch(symbol, symbolLength, line):
  out = False
  if line[:symbolLength+1] == symbol + ",":
    out = True
  return out

def formatLineData(lineData):
  out = [lineData[0]]
  out.append(datetime.strptime(lineData[1], '%Y-%m-%d').date())
  out += [float(d) for d in lineData[2:6]]
  out += [int(float(d)) for d in lineData[6:9]]
  out += [float(d) for d in lineData[9:13]]
  out.append(int(float(lineData[13])))
  return out

是否有人对我的代码的哪些部分运行缓慢以及如何使其表现更好有任何见解?如果不加快速度,我就无法进行我想做的那种分析。


编辑: 为了响应 cmets,我对代码进行了一些更改,以便利用 csv 模块中的现有方法:

def getData(symbol, database):
  out = ["Symbol","Date","Open","High","Low","Close","Volume","Dividend",
         "Split","Adj_Open","Adj_High","Adj_Low","Adj_Close","Adj_Volume"]
  l = len(symbol)
  beforeMatch = True
  with open(database, 'r') as f:
    databaseReader = csv.reader(f, delimiter=",")
    for row in databaseReader:
        match = (row[0] == symbol)
        if beforeMatch and match:
            beforeMatch = False
            out.append(formatLineData(row))
        elif not beforeMatch and match:
            out.append(formatLineData(row))
        elif not beforeMatch and not match:
            break
  return out

def getDataColumn(dataset, col=12, numDays=100, changeRateTransform=False):
  if not changeRateTransform:
    out = [day[col] for day in dataset[-numDays:]]
  else:
    n = len(dataset)
    out = [(dataset[i][col] - dataset[i-1][col])/dataset[i-1][col] for i in range(n - numDays, n)]
  return out

使用 csv.reader 类的性能更差。我测试了两只股票,AMZN(靠近文件顶部)和 ZNGA(靠近文件底部)。使用原始方法,运行时间分别为 0.99 秒和 18.37 秒。使用利用 csv 模块的新方法,运行时间分别为 3.04 秒和 64.94 秒。两者都返回正确的结果。

我的想法是,查找库存比解析所占用的时间更多。如果我在文件 A 中的第一只股票上尝试这些方法,这些方法都在大约 0.12 秒内运行。

【问题讨论】:

  • 你应该使用 csv 模块而不是在这里滚动你自己的。 pandas csv 阅读器还可以大大加快摄取速度。
  • 对于重复查询,另一个选项是将整个内容放入 sqlite 并添加一些索引。
  • 您可以考虑使用生成器,将所有out.append(...) 替换为yield ...,以便函数可以延迟执行(为不断增长的列表重新分配空间更少)
  • 我会将其全部写入数据库,然后查询所需的输出。
  • @JeffDavis 是的,这使得缩小特定性能问题的范围有点棘手。但我认为我们已经确定了几个不同的领域。一是摄取速度——我认为 pandas 很好地涵盖了这一点。另一个是处理速度,如果您的数据集是内存中的数据集,pandas 可能会再次覆盖该处理速度。最后,这是关于静态数据集还是持续更新的数据集。

标签: python performance csv


【解决方案1】:

当您要对同一个数据集进行大量分析时,实用的方法是将其全部读入数据库。它是为快速查询而设计的; CSV 不是。使用 sqlite 命令行工具,例如可以直接从 CSV 导入。然后在(Symbol, Date) 上添加一个索引,查找几乎是即时的。

如果由于某种原因不可行,例如因为新文件随时可能进入,而您无法在开始分析它们之前准备好时间,则必须尽量直接处理 CSV ,这是我的其余答案将重点关注的内容。但请记住,这是一种平衡行为。要么你先付很多钱,要么为每次查找多付一点钱。最终,对于一定数量的查询,预付费用会更便宜。

优化是指最大化未完成的工作量。在这种情况下,使用生成器和内置的 csv 模块并没有多大帮助。您仍然会阅读整个文件并解析所有文件,至少对于换行符。有了这么多的数据,这是不行的。

解析需要阅读,因此您必须首先找到解决方法。将所有错综复杂的 CSV 格式留给专用模块的最佳实践在无法为您提供所需性能时毫无意义。必须做一些作弊,但要尽可能少。在这种情况下,我认为可以安全地假设新行的开头可以标识为b'\n"AMZN",'(坚持您的示例)。是的,这里是二进制,因为记住:还没有解析。您可以从头开始扫描文件作为二进制文件,直到找到第一行。从那里读取您需要的行数,以正确的方式解码和解析它们,等等。不需要在那里进行优化,因为与您不这样做的数十万行无关的行相比,100 行没什么好担心的工作。

放弃所有的解析会给你带来很多好处,但阅读也需要优化。不要先将整个文件加载到内存中,然后尽可能多地跳过 Python 层。使用mmap 可以让操作系统透明地决定将哪些内容加载到内存中,并让您直接处理数据。

如果符号接近结尾,您仍然可能会读取整个文件。这是一个线性搜索,这意味着它所花费的时间与文件中的行数成线性比例。不过你可以做得更好。由于文件已排序,因此您可以改进该功能以执行一种二进制搜索。将采取的步骤数(其中一个步骤正在读取一行)接近于行数的二进制对数。换句话说:您可以将文件分成两个(几乎)大小相同的部分的次数。当有一百万行时,相差五个数量级!

这是我根据 Python 自己的 bisect_left 提出的,并采取了一些措施来解释您的“值”跨越多个索引这一事实:

import csv
from itertools import islice
import mmap

def iter_symbol_lines(f, symbol):
    # How to recognize the start of a line of interest
    ident = b'"' + symbol.encode() + b'",'
    # The memory-mapped file
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    # Skip the header
    mm.readline()
    # The inclusive lower bound of the byte range we're still interested in
    lo = mm.tell()
    # The exclusive upper bound of the byte range we're still interested in
    hi = mm.size()
    # As long as the range isn't empty
    while lo < hi:
        # Find the position of the beginning of a line near the middle of the range
        mid = mm.rfind(b'\n', 0, (lo+hi)//2) + 1
        # Go to that position
        mm.seek(mid)
        # Is it a line that comes before lines we're interested in?
        if mm.readline() < ident:
            # If so, ignore everything up to right after this line
            lo = mm.tell()
        else:
            # Otherwise, ignore everything from right before this line
            hi = mid
    # We found where the first line of interest would be expected; go there
    mm.seek(lo)
    while True:
        line = mm.readline()
        if not line.startswith(ident):
            break
        yield line.decode()

with open(filename) as f:
    r = csv.reader(islice(iter_symbol_lines(f, 'AMZN'), 10))
    for line in r:
        print(line)

对此代码不做任何保证;我没有过多关注边缘情况,也无法使用(任何)您的文件进行测试,因此将其视为概念证明。但是,它的速度非常快——想想 SSD 上的几十毫秒!

【讨论】:

  • 由于您正在努力通过所有选项 - pandas.read_table 非常快(假设选项允许 engine='c' 和适当大小的 chunksize
  • @pvg 我实际上用 3.5GB 文件尝试了pandas,但并不能真正让它表现良好(即,即使分块也不会占用我所有的内存)。
  • 这很奇怪。我从它的摄取上获得了相当一致的 2 倍甚至更高的改进。我不认为摄取是这里的根本问题,但我们得到如此不同的数字似乎很奇怪。
  • @pvg 当然必须有某种方法来解决这个问题,但我不确定它会给我的答案添加什么。它主要专注于使 CSV 工作得足够好,我认为现在可以了。 :)
  • 是的,你是对的,而且我越来越相信(来自海报的 cmets)这不是关于摄取的真正问题。
【解决方案2】:

所以我有一个替代解决方案,我自己运行和测试了一个我在 Quandl 上获得的示例数据集,该数据集似乎具有所有相同的标题和相似的数据。 (假设我没有误解您试图达到的最终结果)。

我有一个我们的工程师为我们构建的用于解析大量 csv 的命令行工具——因为我每天都在处理大量荒谬的数据——它是开源的,你可以在这里获得它:@987654321 @

我也已经为它编写了简短的 bash 脚本,以防您不想流水线化命令,但它也支持流水线化。

运行以下短脚本的命令遵循一个超级简单的约定:

bash tickers.sh wikiprices.csv 'AMZN' '2016-12-\d+|2016-11-\d+'

#!/bin/bash


dates="$3"
cat "$1" \
  | gocsv filter --columns 'ticker' --regex "$2" \
  | gocsv filter --columns 'date' --regex "$dates" > "$2"'-out.csv'
  • ticker 和 dates 的参数都是正则表达式
  • 您可以在该正则表达式中添加任意数量的变体,并用| 分隔它们。
  • 因此,如果您想要 AMZN 和 MSFT,那么您只需将其修改为:AMZN|MSFT

  • 我对日期做了非常相似的事情 - 但我只将样本运行限制在本月或上个月的任何日期。

最终结果

起始数据:

myusername$ gocsv dims wikiprices.csv    
Dimensions:
  Rows: 23946
  Columns: 14

myusername$ bash tickers.sh wikiprices.csv 'AMZN|MSFT' '2016-12-\d+'

myusername$ gocsv dims AMZN|MSFT-out.csv
Dimensions:
  Rows: 24
  Columns: 14

这是一个示例,我仅限于这 2 个代码,然后仅限于 12 月:

瞧 - 在几秒钟内,您就保存了第二个文件,其中没有您关心的数据。

顺便说一句,gocsv 程序有很好的文档 - 以及大量其他功能,例如基本上以任何规模运行 vlookup(这就是激发创作者制作工具的原因)

【讨论】:

  • 非常感谢 - 这看起来很棒。我一定会在 gocsv 上学习。你是对的,这是 Quandl 数据。
  • ;) 他们不称我为数据忍者哈哈
  • 如果您有任何问题等,也可以随时联系我 - 特别是因为 gocsv 来自我们的回购 - datafox.co 的 aurielle
  • 非常感谢,我会这样做的!
【解决方案3】:

除了使用csv.reader 之外,我认为使用itertools.groupby 会加快查找所需部分的速度,因此实际迭代可能如下所示:

import csv
from itertools import groupby 
from operator import itemgetter #for the keyfunc for groupby

def getData(wanted_symbol, filename):
    with open(filename) as file:
        reader = csv.reader(file)
        #so each line in reader is basically line[:-1].split(",") from the plain file
        for symb, lines in groupby(reader, itemgetter(0)):
            #so here symb is the symbol at the start of each line of lines
            #and lines is the lines that all have that symbol in common
            if symb != wanted_symbol:
                continue #skip this whole section if it has a different symbol
            for line in lines:
                #here we have each line as a list of fields
                #for only the lines that have `wanted_symbol` as the first element
                <DO STUFF HERE>

所以在&lt;DO STUFF HERE&gt; 的空间中,您可以使用out.append(formatLineData(line)) 来执行您当前代码的操作,但是该函数的代码有很多不必要的切片和+= 运算符,我认为这对于列表来说非常昂贵(可能是错误的),您可以应用转换的另一种方法是列出所有转换:

def conv_date(date_str):
    return datetime.strptime(date_str, '%Y-%m-%d').date()

#the conversions applied to each element (taken from original formatLineData)
castings = [str, conv_date,             #0, 1
            float, float, float, float, #2:6
            int, int, int,              #6:9
            float, float, float, float, #9:13
            int]                        #13

然后使用zip 将这些应用于列表理解中一行中的每个字段:

 [conv(val) for conv, val in zip(castings, line)]

因此,您可以将 &lt;DO STUFF HERE&gt; 替换为 out.append 与该理解。


我还想知道切换groupbyreader 的顺序是否会更好,因为您不需要将大部分文件解析为csv,只需将您实际迭代的部分解析为您可以使用一个 keyfunc,只分隔字符串的第一个字段

def getData(wanted_symbol, filename):
    out = [] #why are you starting this with strings in it?
    def checkMatch(line): #define the function to only take the line
        #this would be the keyfunc for groupby in this example
        return line.split(",",1)[0] #only split once, return the first element

    with open(filename) as file:
        for symb, lines in groupby(file,checkMatch):
            #so here symb is the symbol at the start of each line of lines
            if symb != wanted_symbol:
                continue #skip this whole section if it has a different symbol
            for line in csv.reader(lines):
                out.append(  [typ(val) for typ,val in zip(castings,line)]  )
    return out

【讨论】:

  • 感谢您的帖子,但这段代码并没有减少我的运行时间。编辑后的版本运行速度更快,但 AMZN 仍需约 42 秒,ZNGA 仍需约 47 秒。我非常感谢这篇文章向我展示了如何使用我以前没有使用过的 groupby、itemgetter 和 zip。
  • 好的,这样可以确认大部分时间都花在了查找这些部分上,我想知道是否找到第一次出现的类似 whole_file = f.reead() ; start_i = whole_file.index(symbol) 然后通过切片从 start_i 开始解析会产生更好的结果,当然这需要将整个文件或文件的大块存储在内存中,如果这不能加快处理速度,那么我认为问题可能只是你的硬盘驱动器很慢......
  • @TadhgMcDonald-Jensen 你可以mmap 看看它是否有所作为。但我认为根本问题是整个事情是不可重现的,除了猜测之外,几乎没有什么好的选择。
  • @TadhgMcDonald-Jensen 我绝对没有使用非常高性能的机器,所以硬盘驱动器可能是一个限制。如何确定我的机器是否可以将整个文件放入内存?
  • 嗯...我假设如果你在 RAM 中至少有文件的大小,你就可以将它全部加载到内存中,仍然将整个 3.3GB 文件加载到内存中是'即使可能也不推荐,pvg 建议使用mmap 肯定会比加载整个内容更好。 (让您直接将文件视为字符串)
猜你喜欢
  • 2018-11-01
  • 2018-12-09
  • 2020-03-15
  • 2011-03-29
  • 1970-01-01
  • 1970-01-01
  • 2021-04-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多