【发布时间】:2014-05-11 17:15:56
【问题描述】:
我有一个大型(约 1.6 亿行)数据框,我已将其存储到磁盘中,如下所示:
def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
该表有一个时间索引,除了时间之外,我将使用c_id 和f_id 进行查询(通过索引)。
我有另一个包含约 18000 个“事件”的数据框。每个事件都包含一些(少至数百,多至数十万)个人记录。我需要为每个事件收集一些简单的统计数据并将它们存储起来,以便收集一些汇总统计数据。目前我这样做:
def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))
def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents
这一切都很好,除了每个store.select() 语句大约需要 5 秒,这意味着处理整个月的数据需要 24-30 小时的处理时间。同时,我需要的实际统计数据比较简单:
def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})
incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)
我的问题是:我如何才能提高此工作流程任何部分的性能或效率?(请注意,我实际上是批量处理大部分工作以获取和存储事件的某一天时间只是因为我想限制在崩溃时丢失已处理数据的风险。为了简单起见,我把这段代码放在这里,因为我实际上需要处理整个月的数据。)
有没有办法在我从商店收到数据时对其进行处理,这有什么好处吗? 我会从使用 store.select_as_index 中受益吗?如果我收到索引,我仍然需要访问数据以获取正确的统计信息吗?
其他注意事项/问题:我比较了将 HDFStore 存储在 SSD 和普通硬盘驱动器上的性能,并没有发现 SSD 有任何改进。这是预期的吗?
我还玩弄了创建大量查询字符串并同时请求它们的想法。当总查询字符串过大(约 5-10 个查询)时,这会导致内存错误。
编辑 1 如果重要的话,我使用的是 3.1.0 版的表格和 0.13.1 版的熊猫
编辑 2 以下是更多信息:
ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
以下是主表和 inc_times 的示例:
In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5 rows x 5 columns]
In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5 rows x 4 columns]
关于 c_id 和 f_id,我想从完整存储中选择的 ID 集合与存储中的 ID 总数相比相对较少。换句话说,inc_times中有一些流行的ID,我会重复查询,而完全忽略全表中存在的一些ID。我估计我关心的 ID 大约占总 ID 的 10%,但这些是最受欢迎的 ID,因此它们的记录在整个集合中占主导地位。
我有 16GB 内存。完整存储为 7.4G,完整数据集(作为 csv 文件)仅为 8.7 GB。最初,我相信我能够将整个内容加载到内存中,并且至少可以对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其批处理为每日文件(完整文件由一个月的数据组成)。
【问题讨论】:
-
你能在 hdf 文件上张贴
ptdump -av -
您能否发布您存储的数据样本以及inc_times 样本
-
c_id 和 f_id 的相对频率是多少,它们是相对独特还是很常见,您每次选择的范围有多大(例如时间戳范围)
-
os / 你有多少主内存可用,存储的文件有多大,以 GB 为单位?
-
您能否为单个选择发布 %prun(例如,使用单个 makeQueryString)。并请在您编辑时发表评论(我会这样收到一条消息)。
标签: python pandas hdfs large-data