【问题标题】:Process dask dataframe by chunks of rows按行块处理 dask 数据帧
【发布时间】:2021-01-21 17:06:50
【问题描述】:

我有一个使用某个 blocksize 的块创建的 dask 数据框:

df = dd.read_csv(filepath, blocksize = blocksize * 1024 * 1024)

我可以像这样分块处理它:

partial_results = []
for partition in df.partitions:
    partial = trivial_func(partition[var])
    partial_results.append(partial)
result = delayed(sum)(partial_results)

(这里我尝试使用map_partitions,但最终只使用了for 循环)。直到这部分一切正常。

现在,我需要对相同的数据运行一个函数,但是这个函数需要接收一定数量的数据帧(例如rows_per_chunk=60),这可以实现吗?有了熊猫,我会这样做:

partial_results = []
for i in range(int(len_df/rows_per_chunk)): # I think ceil would be better if decimal
    arg_data = df.iloc[i*rows_per_chunk:(i+1)*rows_per_chunk]
    partial = not_so_trivial_func(arg_data)
    partial_results.append(partial)
result = sum(partial_results)

是否可以用 dask 做这样的事情?我知道由于延迟评估,无法使用iloc,但是否可以以不同的方式对数据帧进行分区?如果不是,那么使用 dask 实现这一目标的最有效方法是什么?数据框有数百万行。

【问题讨论】:

  • 你需要这些块来表示连续的行吗? (这将发生在pandas
  • 是的,前n行属于某个组,需要与后n行分开处理,以此类推。

标签: python pandas dask


【解决方案1】:

您可以沿着定义如何跨分区分配索引值的分区重新分区数据帧(假设唯一索引)。

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

# there will 5 rows per partition
print(ddf.map_partitions(len).compute())

# you can see that ddf is split along these index values
print(ddf.divisions)

# change the divisions to have the desired spacing
new_divisions = (0, 3, 6, 9, 12, 14)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be 3 rows per partition
print(new_ddf.map_partitions(len).compute())

如果索引未知,则可以创建一个新索引(假设行不需要排序)并沿计算的分区重新分区:

import dask.dataframe as dd
import pandas as pd

# save some data into unindexed csv
num_rows = 15
df = pd.DataFrame(range(num_rows), columns=['x'])
df.to_csv('dask_test.csv', index=False)


# read from csv
ddf = dd.read_csv('dask_test.csv', blocksize=10)

# assume that rows are already ordered (so no sorting is needed)
# then can modify the index using the lengths of partitions
cumlens = ddf.map_partitions(len).compute().cumsum()

# since processing will be done on a partition-by-partition basis, save them
# individually
new_partitions = [ddf.partitions[0]]
for npart, partition in enumerate(ddf.partitions[1:].partitions):
    partition.index = partition.index + cumlens[npart]
    new_partitions.append(partition)

# this is our new ddf
ddf = dd.concat(new_partitions)

#  set divisions based on cumulative lengths
ddf.divisions = tuple([0] + cumlens.tolist())

# change the divisions to have the desired spacing
new_partition_size = 12
max_rows = cumlens.tolist()[-1]
new_divisions = list(range(0, max_rows, new_partition_size))
if new_divisions[-1]<max_rows:
    new_divisions.append(max_rows)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be desired rows per partition
print(new_ddf.map_partitions(len).compute())

【讨论】:

  • 嗨@SultanOrazbayev,我尝试了您的解决方案并且您的代码有效,但不适用于我的情况。问题是,以我拥有的数据量,我无法先创建 Pandas 数据框。
  • 我得到了left side of old and new divisions are different,因为ddf.divisions 返回一个只有None 的元组。
  • 我明白了,您是从 parquet 还是 csv 加载?您的原始数据似乎未编入索引。
  • 我正在从 CSV 加载,事实上,我的数据没有被索引。 dask 是否总是需要索引数据?
  • 某些操作需要索引,而不是全部。请参阅更新的示例。我希望这行得通。
猜你喜欢
  • 2010-11-26
  • 2020-09-07
  • 1970-01-01
  • 2016-07-16
  • 2022-11-10
  • 2021-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多