快速 csv 文件分割
如果你有一个非常大的文件并且你必须尝试不同的分区(比如找到最好的分割方法)上面的解决方案太慢了,无法尝试。
解决此问题的另一种方法(并且非常快速)是按记录编号创建索引文件。创建一个 6867839 行和 9 Gb 的 csv 文件的索引文件大约需要 6 分钟,另外还需要 2 分钟让 joblib 将其存储在磁盘上。
如果您处理的是 3 Gb 或更大的大文件,这种方法尤其令人印象深刻。
创建索引文件的代码如下:
# Usage:
# creaidx.py filename.csv
# indexes a csv file by record number. This can be used to
# access any record directly or to split a file without the
# need of reading it all. The index file is joblib-stored as
# filename.index
# filename.csv is the file to create index for
import os,sys,joblib
BLKSIZE=512
def checkopen(s,m='r',bz=None):
if os.access(s,os.F_OK):
if bz==None:
return open(s,m) # returns open file
else:
return open(s,m,bz) # returns open file with buffer size
else:
return None
def get_blk():
global ix,off,blk,buff
while True: # dealing with special cases
if ix==0:
n=0
break
if buff[0]==b'\r':
n=2
off=0
break
if off==BLKSIZE-2:
n=0
off=0
break
if off==BLKSIZE-1:
n=0
off=1
break
n=2
off=buff.find(b'\r')
break
while (off>=0 and off<BLKSIZE-2):
idx.append([ix,blk,off+n])
# g.write('{},{},{}\n'.format(ix,blk,off+n))
print(ix,end='\r')
n=2
ix+=1
off= buff.find(b'\r',off+2)
def crea_idx():
global buff,blk
buff=f.read(BLKSIZE)
while len(buff)==BLKSIZE:
get_blk()
buff=f.read(BLKSIZE)
blk+=1
get_blk()
idx[-1][2]=-1
return
if len(sys.argv)==1:
sys.exit("Need to provide a csv filename!")
ix=0
blk=0
off=0
idx=[]
buff=b'0'
s=sys.argv[1]
f=checkopen(s,'rb')
idxfile=s.replace('.csv','.index')
if checkopen(idxfile)==None:
with open(idxfile,'w') as g:
crea_idx()
joblib.dump(idx,idxfile)
else:
if os.path.getctime(idxfile)<os.path.getctime(s):
with open(idxfile,'w') as g:
crea_idx()
joblib.dump(idx,idxfile)
f.close()
让我们用一个玩具例子:
strings,numbers,colors
string1,1,blue
string2,2,red
string3,3,green
string4,4,yellow
索引文件将是:
[[0, 0, 0],
[1, 0, 24],
[2, 0, 40],
[3, 0, 55],
[4, 0, 72],
[5, 0, -1]]
注意最后一个索引元素处的-1,以指示在顺序访问的情况下索引文件的结尾。您可以使用这样的工具来访问 csv 文件的任何单个行:
def get_rec(n=1,binary=False):
n=1 if n<0 else n+1
s=b'' if binary else ''
if len(idx)==0:return ''
if idx[n-1][2]==-1:return ''
f.seek(idx[n-1][1]*BLKSIZE+idx[n-1][2])
buff=f.read(BLKSIZE)
x=buff.find(b'\r')
while x==-1:
s=s+buff if binary else s+buff.decode()
buff=f.read(BLKSIZE)
x=buff.find(b'\r')
return s+buff[:x]+b'\r\n' if binary else s+buff[:x].decode()
索引记录的第一个字段显然是不必要的。它保留在那里用于调试目的。附带说明一下,如果您将此字段替换为 csv 记录中的任何字段并按该字段对索引文件进行排序,那么如果您使用索引字段,则您将获得 按该字段排序的 csv 文件访问 csv 文件。
现在,一旦您创建了索引文件,您只需使用文件名(已创建索引的那个)和一个介于 1 和 100 之间的数字调用以下程序,该数字将作为命令拆分文件的百分比线路参数:
start_time = time.time()
BLKSIZE=512
WSIZE=1048576 # pow(2,20) 1Mb for faster reading/writing
import sys
import joblib
from common import Drv,checkopen
ix=0
blk=0
off=0
idx=[]
buff=b'0'
if len(sys.argv)<3:
sys.exit('Argument missing!')
s=Drv+sys.argv[1]
if sys.argv[2].isnumeric():
pct=int(sys.argv[2])/100
else:
sys.exit('Bad percentage: '+sys.argv[2])
f=checkopen(s,'rb')
idxfile=s.replace('.csv','.index')
if checkopen(idxfile):
print('Loading index...')
idx=joblib.load(idxfile)
print('Done loading index.')
else:
sys.exit(idxfile+' does not exist.')
head=get_rec(0,True)
n=int(pct*(len(idx)-2))
off=idx[n+1][1]*BLKSIZE+idx[n+1][2]-len(head)-1
num=off//WSIZE
res=off%WSIZE
sout=s.replace('.csv','.part1.csv')
i=0
with open(sout,'wb') as g:
g.write(head)
f.seek(idx[1][1]*BLKSIZE+idx[1][2])
for x in range(num):
print(i,end='\r')
i+=1
buff=f.read(WSIZE)
g.write(buff)
buff=f.read(res)
g.write(buff)
print()
i=0
sout=s.replace('.csv','.part2.csv')
with open(sout,'wb') as g:
g.write(head)
f.seek(idx[n+1][1]*BLKSIZE+idx[n+1][2])
buff=f.read(WSIZE)
while len(buff)==WSIZE:
g.write(buff)
print(i,end='\r')
i+=1
buff=f.read(WSIZE)
g.write(buff)
end_time = time.time()
文件是使用 1048576 字节的块创建的。您可以使用该图来加快文件创建速度或将其定制到内存资源较少的机器上。
该文件仅在两个分区上拆分,每个分区都有原始文件的标题。改代码来做也不是太难
将文件分成两个以上的分区。
最后,从长远来看,将 6867839 行和 9 Gb 的 csv 文件拆分 50%,我花了大约 6 分钟来创建索引文件,又花了 2 分钟让 joblib 将其存储在磁盘上。拆分文件又花了 3 分钟。