您可以通过首先为大型 CSV 文件创建索引来从文件中读取 count 随机行。除非数据更改,否则只需执行一次。该索引将包含所有换行符所在文件的偏移量。
然后可以通过首先寻找所需的偏移量并读入一行来轻松读入随机行。
例如:
import random
import csv
import os
import io
def create_index(index_filename, csv_filename):
with open(csv_filename, 'rb') as f_csv:
index = 1
line_indexes = [] # Use [0] if no header
linesep = ord(os.linesep[-1])
while True:
block = f_csv.read(io.DEFAULT_BUFFER_SIZE * 1000)
if block:
block_index = 0
line_indexes.extend(offset + index for offset, c in enumerate(block) if c == linesep)
index += len(block)
else:
break
with open(index_filename, 'w') as f_index:
f_index.write('\n'.join(map(str, line_indexes)))
def get_rows(count, index_filename, csv_filename):
sys_random = random.SystemRandom()
with open(index_filename) as f_index:
line_indexes = list(map(int, f_index.read().splitlines()))
row_count = len(line_indexes)
with open(csv_filename) as f_csv:
for _ in range(count):
line_number = sys_random.randint(0, row_count-1)
f_csv.seek(line_indexes[line_number])
if line_number == row_count - 1:
line = f_csv.read()
else:
line = f_csv.read(line_indexes[line_number + 1] - line_indexes[line_number])
yield line_number, next(csv.reader(io.StringIO(line)))
index_filename = 'index.txt'
csv_filename = 'input.csv'
create_index(index_filename, csv_filename) # only needed ONCE
for row_number, row in get_rows(10, index_filename, csv_filename):
print(f"Row {row_number} {row}")
同样的想法可以用于从随机起始行读取或以随机顺序读取。
显然来回查找不会像顺序读取文件那样快,但它应该比从头开始读取要快很多。