【问题标题】:Reading large CSV files from nth line in Python (not from the beginning)从 Python 中的第 n 行读取大型 CSV 文件(不是从头开始)
【发布时间】:2017-06-23 03:05:41
【问题描述】:

我有 3 个包含气候数据的巨大 CSV 文件,每个大约 5GB。 每行中的第一个单元格是气象站的编号(从 0 到大约 100,000)每个站在每个文件中包含 1 到 800 行,这在所有文件中不一定相等。例如,Station 11 在 file1、file2 和 file3 中分别有 600、500 和 200 行。 我想读取每个站的所有行,对它们进行一些操作,然后将结果写入另一个文件,然后是下一个站,等等。 这些文件太大而无法在内存中一次加载,因此我尝试了一些解决方案以最小的内存负载读取它们,例如 this postthis post 包括此方法:

with open(...) as f:
    for line in f:
        <do something with line> 

这种方法的问题是每次都是从头开始读取文件,而我想读取文件如下:

for station in range (100798):
    with open (file1) as f1, open (file2) as f2, open (file3) as f3:
        for line in f1:
            st = line.split(",")[0]
            if st == station:
                <store this line for some analysis>
            else:
                break   # break the for loop and go to read the next file
        for line in f2:
            ...
            <similar code to f1>
            ...
        for line in f3:
            ...
            <similar code to f1>
            ...
    <do the analysis to station, the go to next station>

问题是每次我重新开始下一站时,for循环都会从头开始,而我希望它从第n行发生'Break'的地方开始,即继续读取文件.

我该怎么做?

提前致谢

关于以下解决方案的注意事项: 正如我在发布答案时在下面提到的那样,我实现了@DerFaizio 的答案,但我发现它的处理速度非常慢。

在我尝试了@PM_2Ring 提交的基于生成器的答案后,我发现它非常快。也许是因为它依赖于生成器。

两种解决方案之间的差异可以通过每分钟处理的站数来观察基于生成器的解决方案为 2500 st/min,而基于 Pandas 的解决方案为 45 st/min。其中基于生成器的解决方案要快 >55 倍

我将保留下面的两个实现以供参考。 非常感谢所有贡献者,尤其是@PM_2Ring。

【问题讨论】:

  • 您可以使用f1.tell() 存储文件位置,并在下次查找。
  • 谢谢@Jean-FrançoisFabre,但是,这需要很长时间,因为每个文件包含超过 5 亿行。并且不存储 file.tell() 位置,我可以在排序时再次搜索站号。再次感谢您的建议,但我认为有更好的解决方案。
  • 问题在于这些行的大小是可变的,因此要到达第 N 行,您必须至少遍历所有先前的行一次(然后缓存结果)。祝你好运。
  • 但是你的with 阻塞了外部for循环。每次退出并重新进入with块时,文件都会关闭并重新打开,因此它们会从头开始。
  • 这3个文件中的每一个都包含range(100798)中每个站号的数据,每个文件中的数据行按站号排序。对吗?

标签: python performance csv bigdata


【解决方案1】:

下面的代码逐行遍历文件,依次从每个文件中获取每个站点的行并将它们附加到列表中以供进一步处理。

这段代码的核心是一个生成器file_buff,它产生文件的行,但它允许我们将一行推回以供以后阅读。当我们读取下一站的线路时,我们可以将其发送回file_buff,以便在需要处理该站的线路时重新读取。

为了测试这段代码,我使用create_data 创建了一些简单的假站数据。

from random import seed, randrange

seed(123)

station_hi = 5
def create_data():
    ''' Fill 3 files with fake station data '''
    fbase = 'datafile_'
    for fnum in range(1, 4):
        with open(fbase + str(fnum), 'w') as f:
            for snum in range(station_hi):
                for i in range(randrange(1, 4)):
                    s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
                    print(s)
                    f.write(s + '\n')
        print()

create_data()

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

# An infinite counter that yields numbers converted to strings
def str_count(start=0):
    n = start
    while True: 
        yield str(n)
        n += 1

# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

    for snum_str in str_count():
        station_lines = []
        for fb in (fb1, fb2, fb3):
            for line in fb:
                #Extract station number string & station data
                sid, sdata = line.split()
                if sid != snum_str:
                    # This line contains data for the next station,
                    # so push it back to the buffer
                    rc = fb.send(line)
                    # and go to the next file
                    break
                # Otherwise, append this data
                station_lines.append(sdata)

        #Process all the data lines for this station
        if not station_lines:
            #There's no more data to process
            break
        print('Station', snum_str)
        print(station_lines)

输出

0 data100
1 data110
1 data111
2 data120
3 data130
3 data131
4 data140
4 data141

0 data200
1 data210
2 data220
2 data221
3 data230
3 data231
3 data232
4 data240
4 data241
4 data242

0 data300
0 data301
1 data310
1 data311
2 data320
3 data330
4 data340

Station 0
['data100', 'data200', 'data300', 'data301']
Station 1
['data110', 'data111', 'data210', 'data310', 'data311']
Station 2
['data120', 'data220', 'data221', 'data320']
Station 3
['data130', 'data131', 'data230', 'data231', 'data232', 'data330']
Station 4
['data140', 'data141', 'data240', 'data241', 'data242', 'data340']

如果其中一个或两个文件中缺少特定站点的站点数据,则此代码可以处理,但如果所有三个文件中都缺少站点数据,则无法处理,因为当station_lines 列表为空时,它会中断主处理循环,但这对您的数据来说应该不是问题。


有关生成器和generator.send 方法的详细信息,请参阅文档中的6.2.9. Yield expressions

此代码是使用 Python 3 开发的,但它也可以在 Python 2.6+ 上运行(您只需在脚本顶部包含 from __future__ import print_function)。


如果所有 3 个文件中可能缺少站点 ID,我们可以轻松处理。只需使用简单的range 循环而不是无限的str_count 生成器。

from random import seed, randrange

seed(123)

station_hi = 7
def create_data():
    ''' Fill 3 files with fake station data '''
    fbase = 'datafile_'
    for fnum in range(1, 4):
        with open(fbase + str(fnum), 'w') as f:
            for snum in range(station_hi):
                for i in range(randrange(0, 2)):
                    s = '{1} data{0}{1}{2}'.format(fnum, snum, i)
                    print(s)
                    f.write(s + '\n')
        print()

create_data()

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

station_start = 0
station_stop = station_hi

# Extract station data from all 3 files
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3:
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

    for i in range(station_start, station_stop):
        snum_str = str(i)
        station_lines = []
        for fb in (fb1, fb2, fb3):
            for line in fb:
                #Extract station number string & station data
                sid, sdata = line.split()
                if sid != snum_str:
                    # This line contains data for the next station,
                    # so push it back to the buffer
                    rc = fb.send(line)
                    # and go to the next file
                    break
                # Otherwise, append this data
                station_lines.append(sdata)

        if not station_lines:
            continue
        print('Station', snum_str)
        print(station_lines)

输出

1 data110
3 data130
4 data140

0 data200
1 data210
2 data220
6 data260

0 data300
4 data340
6 data360

Station 0
['data200', 'data300']
Station 1
['data110', 'data210']
Station 2
['data220']
Station 3
['data130']
Station 4
['data140', 'data340']
Station 6
['data260', 'data360']

【讨论】:

  • 非常感谢@PM_2Ring 这段代码看起来很棒而且很聪明,但我想知道你为什么在 str_count 生成器中将站号转换为字符串?如果我想迭代原始站数 100797 怎么办,因为这三个文件中已经缺少一些站号(还有更多文件包含丢失站的其他数据,但我想处理这三个温度文件仅限。)
  • @MohammadElNesr 我将站号转换为str_count 生成器中的字符串,因为我们需要为我们读取的每一行测试站号字符串,并且将这些数字字符串与字符串而不是将每个字符串转换为整数来进行比较。而且我认为最好在生成器中进行这种转换,而不是用站号整数和站号字符串来混乱主循环。
  • @MohammadElNesr 在开始编写此代码之前,我询问“这三个文件中的每一个是否都包含range(100798) 中每个站号的数据”,您回答说这是正确的。如果不正确,我需要稍微更改逻辑。但是在我的时区已经很晚了,我可能要到明天才有时间进行更改。
  • 不需要@PM_ 2Ring 做进一步的事情。因为我已经改变了需要的东西,并且代码现在运行起来就像一个魅力。非常感谢您的努力。
  • @MohammadElNesr 我已经添加了一个新版本来处理丢失的车站,您只需要指定车站编号范围。
【解决方案2】:

我建议使用 pandas.read_csv。您可以使用 skiprows 指定要跳过的行,还可以使用 nrows 根据您的文件大小使用合理数量的行来加载 这是文档的链接: http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

【讨论】:

  • 首先,您可以使用csv 模块轻松实现这一点,不需要pandas,问题是OP 不提前知道块大小...至少我是这么解释的。
【解决方案3】:

我在@PM-2Ring 发布他的解决方案之前发布了下面的代码。 我想让这两种解决方案都处于活动状态:

依赖于 Pandas 库的 #1 解决方案(@DerFaizio)。 :

这个解决方案在 120 分钟内完成了 5450 个站点(大约 45 个站点/分钟)

import pandas as pd
skips =[1, 1, 1]  # to skip the header row forever
for station_number in range(100798):
    storage = {}
    tmax = pd.read_csv(full_paths[0], skiprows=skips[0], header=None, nrows=126000, usecols=[0, 1, 3])
    tmin = pd.read_csv(full_paths[1], skiprows=skips[1], header=None, nrows=126000, usecols=[0, 1, 3])
    tavg = pd.read_csv(full_paths[2], skiprows=skips[2], header=None, nrows=126000, usecols=[0, 1, 3])

    # tmax is at position 0
    for idx, station in enumerate(tmax[0]):
        if station == station_number:
            date_val = tmax[1][idx]
            t_val = float(tmax[3][idx])/10.
            storage[date_val] = [t_val, None, None]
            skips[0] += 1
        else:
            break
    # tmin is at position 1
    for idx, station in enumerate(tmin[0]):
        # station, date_val, _, val = lne.split(",")
        if station == station_number:
            date_val = tmin[1][idx]
            t_val = float(tmin[3][idx]) / 10.
            if date_val in storage:
                storage[date_val][1] = t_val
            else:
                storage[date_val] = [None, t_val, None]
            skips[1] += 1
        else:
            break
    # tavg is at position 2
    for idx, station in enumerate(tavg[0]):
        ...
        # similar to Tmin
        ...
        pass

    station_info = []
    for key in storage.keys():
        # do some analysis
        # Fill the list station_info 
        pass
    data_out.writerows(station_info)

以下解决方案是基于生成器的解决方案(@PM-2Ring)

此方案12分钟完成30000站(约2500站/分钟)

files = ['Tmax', 'Tmin', 'Tavg']
headers = ['Nesr_Id', 'r_Year', 'r_Month', 'r_Day', 'Tmax', 'Tmin', 'Tavg']

# A file buffer that you can push lines back to
def file_buff(fh):
    prev = None
    while True:
        while prev:
            yield prev
            prev = yield prev
        prev = yield next(fh)

# An infinite counter that yields numbers converted to strings
def str_count(start=0):
    n = start
    while True:
        yield str(n)
        n += 1

# NULL = -999.99
print "Time started: {}".format(time.strftime('%Y-%m-%d %H:%M:%S'))
with open('Results\\GHCN_Daily\\Important\\Temp_All_out_gen.csv', 'wb+') as out_file:
    data_out = csv.writer(out_file, quoting=csv.QUOTE_NONE, quotechar='', delimiter=',', escapechar='\\',
                          lineterminator='\n')
    data_out.writerow(headers)
    full_paths = [os.path.join(source, '{}.csv'.format(file_name)) for file_name in files]
    # Extract station data from all 3 files
    with open(full_paths[0]) as f1, open(full_paths[1]) as f2, open(full_paths[0]) as f3:
        fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3)

        for snum_str in str_count():
            # station_lines = []
            storage ={}
            count = [0, 0, 0]
            for file_id, fb in enumerate((fb1, fb2, fb3)):
                for line in fb:
                    if not isinstance(get__proper_data_type(line.split(",")[0]), str):
                        # Extract station number string & station data
                        sid, date_val, _dummy, sdata = line.split(",")
                        if sid != snum_str:
                            # This line contains data for the next station,
                            # so push it back to the buffer
                            rc = fb.send(line)
                            # and go to the next file
                            break
                        # Otherwise, append this data
                        sdata = float(sdata) / 10.
                        count[file_id] += 1
                        if date_val in storage:
                            storage[date_val][file_id] = sdata
                        else:
                            storage[date_val]= [sdata, None, None]
                        # station_lines.append(sdata)

            # # Process all the data lines for this station
            # if not station_lines:
            #     # There's no more data to process
            #     break
            print "St# {:6d}/100797. Time: {}. Tx({}), Tn({}), Ta({}) ".\
                format(int(snum_str), time.strftime('%H:%M:%S'), count[0], count[1], count[2])
            # print(station_lines)

            station_info = []
            for key in storage.keys():
                # key_val = storage[key]
                tx, tn, ta = storage[key]
                if ta is None:
                    if tx != None and tn != None:
                        ta = round((tx + tn) / 2., 1)
                if tx is None:
                    if tn != None and ta != None:
                        tx = round(2. * ta - tn, 1)
                if tn is None:
                    if tx != None and ta != None:
                        tn = round(2. * ta - tx, 1)
                # print key,
                py_date = from_excel_ordinal(int(key))
                # print py_date
                station_info.append([snum_str, py_date.year, py_date.month, py_date.day, tx, tn, ta])

            data_out.writerows(station_info)
            del station_info

谢谢大家。

【讨论】:

  • for key in storage.keys(): 在 Python 2 中效率低下。它必须先构建一个字典键列表,然后才能开始迭代它们。您可以使用for key in storage: 直接遍历键。在 Python 3 中没关系,因为 dict.keys() 返回一个动态 View 对象(它类似于集合并且适应底层 dict 的变化),而不是一个列表,但写 for key in storage: 仍然更干净。
【解决方案4】:

使用内置的 csv 模块,您可以执行以下操作:

with open(csvfile, 'r') as f:
    reader = csv.reader(f, delimiter=',')
    for i in range(n):
        reader.next()
    for row in reader:
        print row #  Or whatever you want to do here

其中 n 是您要跳过的行数。

【讨论】:

  • 在 csv 阅读器上应用 next,因为如果存在多行行,您的代码将惨遭失败。这似乎无关紧要,因为 OP 没有使用 csv 模块,但仍然。
  • 另外,您的解决方案对于 OP 来说还不够好。他想定位在正确的线路上,而不是从一开始就读取线路。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-04
  • 2018-06-23
  • 1970-01-01
  • 1970-01-01
  • 2011-03-29
  • 2020-11-12
相关资源
最近更新 更多