【发布时间】:2019-08-16 05:18:00
【问题描述】:
我正在使用 Dask 来执行以下逻辑:
- 从多个输入文件(每个文件一个
pd.DataFrame)中读取延迟的主控dd.DataFrame - 在主延迟 DataFrame 上执行多个
query调用 - 使用
DataFrame.to_hdf保存来自DataFrame.query调用的所有数据帧。
如果我在我的to_hdf 调用中使用compute=False 并将每个to_hdf 调用返回的Delayeds 列表提供给dask.compute,那么我会遇到崩溃/段错误。 (如果我省略 compute=False 一切运行良好)。一些谷歌搜索给了我一些关于锁的信息;我尝试添加一个dask.distributed.Client 和一个dask.distributed.Lock 馈送到to_hdf,以及一个dask.utils.SerializableLock,但我无法解决崩溃问题。
流程如下:
import uproot
import dask
import dask.dataframe as dd
from dask.delayed import delayed
def delayed_frame(files, tree_name):
"""create master delayed DataFrame from multiple files"""
@delayed
def single_frame(file_name, tree_name):
"""read external file, convert to pandas.DataFrame, return it"""
tree = uproot.open(file_name).get(tree_name)
return tree.pandas.df() ## this is the pd.DataFrame
return dd.from_delayed([single_frame(f, tree_name) for f in files])
def save_selected_frames(df, selections, prefix):
"""perform queries on a delayed DataFrame and save HDF5 output"""
queries = {sel_name: df.query(sel_query)
for sel_name, sel_query in selections.items()]
computes = []
for dfname, df in queries.items():
outname = f"{prefix}_{dfname}.h5"
computes.append(df.to_hdf(outname, f"/{prefix}", compute=False))
dask.compute(*computes)
selections = {"s1": "(A == True) & (N > 1)",
"s2": "(B == True) & (N > 2)",
"s3": "(C == True) & (N > 3)"}
from glob import glob
df = delayed_frame(glob("/path/to/files/*.root"), "selected")
save_selected_frames(df, selections, "selected")
## expect output files:
## - selected_s1.h5
## - selected_s2.h5
## - selected_s3.h5
【问题讨论】:
标签: dask dask-distributed dask-delayed