【发布时间】:2021-06-12 19:27:37
【问题描述】:
我一直在尝试调整我的代码以利用 Dask 来利用多台机器进行处理。虽然初始数据加载并不耗时,但后续处理在 8 核 i5 上大约需要 12 小时。这并不理想,并且认为使用 Dask 帮助将处理分散到机器上会是有益的。以下代码适用于标准 Pandas 方法:
import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype("str")
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip()
)
转换为 Dask 似乎很简单,但我在此过程中遇到了麻烦。以下适用于 Dask 的代码会引发 ValueError: cannot reindex from a duplicate axis 错误:
import dask.dataframe as dd
from dask.distributed import Client
artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip().compute()
)
if __name__ == '__main__':
client = Client()
我能看出的最好的一点是 Dask 不允许重新分配给现有的 Dask DataFrame。所以这行得通:
...
artists_new = artists["name"].astype("str").compute()
...
但是,我真的不想每次都创建一个新的 DataFrame。我宁愿用新的DataFrame替换现有的DataFrame,主要是因为我在处理之前有多个数据清理步骤。
虽然教程和指南很有用,但它们非常基础,并未涵盖此类用例。
Dask DataFrames 的首选方法是什么?
【问题讨论】:
标签: python pandas dask dask-dataframe