【问题标题】:Dask apply with custom functionDask 应用自定义功能
【发布时间】:2020-04-15 23:42:20
【问题描述】:

我正在尝试使用 Dask,但是在分组后使用 apply 时遇到了问题。

我有一个包含大量行的 Dask DataFrame。让我们考虑以下示例

N=10000
df = pd.DataFrame({'col_1':np.random.random(N), 'col_2': np.random.random(N) })
ddf = dd.from_pandas(df, npartitions=8)

我想合并col_1 的值,并遵循here 的解决方案

bins = np.linspace(0,1,11)
labels = list(range(len(bins)-1))
ddf2 = ddf.map_partitions(test_f, 'col_1',bins,labels)

在哪里

def test_f(df,col,bins,labels):
    return df.assign(bin_num = pd.cut(df[col],bins,labels=labels))

这正如我所期望的那样工作。

现在我想取每个 bin 中的中值(取自 here

median = ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute()

有 10 个 bin,我希望 median 有 10 行,但它实际上有 80 行。数据框有 8 个分区,所以我猜想以某种方式应用正在单独处理每个分区。

但是,如果我想要平均值并使用mean

median = ddf2.groupby('bin_num')['col_1'].mean().compute()

它可以工作,输出有 10 行。

那么问题是:我做错了什么导致apply 无法以mean 的身份运行?

【问题讨论】:

    标签: python-3.x pandas dask


    【解决方案1】:

    也许这个警告是关键(Dask doc: SeriesGroupBy.apply):

    Pandas 的 groupby-apply 可用于应用任意函数,包括导致每组一行的聚合。 Dask 的 groupby-apply 将对每个分区-组对应用一次 func,因此当 func 是一个归约时,您最终会得到每个分区-组对一行。要使用 Dask 应用自定义聚合,请使用 dask.dataframe.groupby.Aggregation。

    【讨论】:

      【解决方案2】:

      你是对的!我能够在 Dask 2.11.0 上重现您的问题。好消息是有一个解决方案! Dask groupby 问题似乎专门针对类别类型(pandas.core.dtypes.dtypes.CategoricalDtype)。通过将类别列转换为另一种列类型(float、int、str),groupby 将正常工作。

      这是我复制的你的代码:

      import dask.dataframe as dd
      import pandas as pd
      import numpy as np
      
      
      def test_f(df, col, bins, labels):
          return df.assign(bin_num=pd.cut(df[col], bins, labels=labels))
      
      N = 10000
      df = pd.DataFrame({'col_1': np.random.random(N), 'col_2': np.random.random(N)})
      ddf = dd.from_pandas(df, npartitions=8)
      
      bins = np.linspace(0,1,11)
      labels = list(range(len(bins)-1))
      ddf2 = ddf.map_partitions(test_f, 'col_1', bins, labels)
      
      print(ddf2.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
      

      打印出你提到的问题

      bin_num
      0         NaN
      1         NaN
      2         NaN
      3         NaN
      4         NaN
             ...   
      5    0.550844
      6    0.651036
      7    0.751220
      8         NaN
      9         NaN
      Name: col_1, Length: 80, dtype: float64
      

      这是我的解决方案:

      ddf3 = ddf2.copy()
      ddf3["bin_num"] = ddf3["bin_num"].astype("int")
      
      print(ddf3.groupby('bin_num')['col_1'].apply(pd.Series.median).compute())
      

      打印出来的:

      bin_num
      9    0.951369
      2    0.249150
      1    0.149563
      0    0.049897
      3    0.347906
      8    0.847819
      4    0.449029
      5    0.550608
      6    0.652778
      7    0.749922
      Name: col_1, dtype: float64
      

      @MRocklin 或 @TomAugspurger 您能否在新版本中为此创建修复程序?我认为这里有足够的可重现代码。感谢您的辛勤工作。我喜欢 Dask 并且每天都在使用它;)

      【讨论】:

      • 嗨!感谢您抽出宝贵时间对此进行调查!我在阅读 ava-punksmash 答案后选择的解决方案是 compute 数据框并使用 pandas' groupbymedian。不过,下次遇到类似情况我会执行你的建议!
      猜你喜欢
      • 1970-01-01
      • 2011-07-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-18
      相关资源
      最近更新 更多