【问题标题】:Dask dataframe: Can `set_index` put a single index into multiple partitions?Dask 数据框:`set_index` 可以将单个索引放入多个分区吗?
【发布时间】:2021-12-02 19:57:45
【问题描述】:

从经验上看,每当您在 Dask 数据帧上 set_index 时,Dask 总是会将具有相同索引的行放入单个分区中,即使它会导致分区严重不平衡。

这是一个演示:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

但是,我在任何地方都找不到这种行为的保证。

我曾尝试自己筛选代码但放弃了。我相信这些相互关联的功能之一可能就是答案:

当你set_index时,是不是一个索引永远不能在两个不同的分区中?如果不是,那么这个属性在什么条件下成立?


赏金:我会将赏金奖励给来自信誉良好的来源的答案。例如,引用实现以表明该属性必须保持。

【问题讨论】:

    标签: python dataframe indexing dask


    【解决方案1】:

    是不是一个索引永远不能在两个不同的分区中?

    不,当然是允许的。 Dask 甚至打算让这种情况发生。但是,由于set_index 中的bug,所有数据仍将最终存储在一个分区中。

    一个极端的例子(每一行都是相同的值,除了一个):

    In [1]: import dask.dataframe as dd
    In [2]: import pandas as pd
    In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
    In [4]: ddf = dd.from_pandas(df, npartitions=10)
    In [5]: s = ddf.set_index("A")
    In [6]: s.divisions
    Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)
    

    如您所见,Dask 打算将0s 拆分为多个分区。然而,当 shuffle 实际发生时,所有 0s 仍然最终在一个分区中:

    In [7]: import dask
    In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
    Out[8]: 
    ([Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [],
      Empty DataFrame
      Columns: []
      Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)
    

    这是因为code 决定行属于哪个输出分区不会考虑divisions 中的重复项。将divisions 视为一个系列,它使用searchsortedside="right",因此所有数据总是在最后一个分区中结束。

    问题解决后我会更新此答案。

    【讨论】:

      【解决方案2】:

      是不是一个索引永远不能在两个不同的分区?

      IIUC,实用的答案是肯定的。

      一个 dask 数据帧通常会有多个分区,并且 dask 可能知道也可能不知道与每个分区相关联的索引值(see Partitions)。如果 dask 确实知道哪个分区包含哪个索引范围,那么这将反映在 df.divisions 输出中(如果不知道,此调用的结果将是 None)。

      在运行.set_index 时,dask 将计算除法,似乎在确定除法时,它要求除法是连续且唯一的(最后一个元素除外)。相关代码为here

      因此有两个潜在的后续问题:为什么不允许任何非顺序索引,以及作为前面的一个特定情况,为什么不允许分区中的重复索引。

      关于第一个问题:对于较小的数据,考虑允许非排序索引的设计可能是可行的,但您可以想象一般的非排序索引不会很好地扩展,因为 dask 需要以某种方式存储每个分区的索引。

      关于第二个问题:似乎这应该是可能的,但现在似乎还没有正确实施。请参阅下面的 sn-p:

      # use this to generate 10 indexed partitions
      import pandas as pd
      
      for user in range(10):
          
          df = pd.DataFrame({'user_col': [user//3]*100})
          df['user'] = df['user_col']
          df = df.set_index('user')
          df.index.name = 'user_index'
          
          df.to_parquet(f'test_{user}.parquet', index=True)
      
      
      # now load them into a dask dataframe
      import dask.dataframe as dd
      
      ddf = dd.read_parquet('test_*.parquet')
      
      # dask will know about the divisions
      print(ddf.known_divisions) # True
      
      # further evidence
      print(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)
      
      # this should show three partitions, but will show only one
      print(ddf.loc[0].npartitions) # 1
      

      【讨论】:

      • 感谢您的回答。我认为我的问题可能更清楚,因为我对 Dask 的 set_index 的行为特别感兴趣。这个答案非常相关和有趣,但没有解决这个问题。
      • 顺便说一句,对我来说,在您的示例中,部门实际上是未知的(known_divisionsFalse)。我使用的是 Dask 2021.08.01,Pandas 1.3.1 和 PyArrow 5.0.0
      • 别担心,我对 dask 内部结构没有信心。
      【解决方案3】:

      我刚刚注意到 Dask 的 shuffle 文档说

      此操作后,on 值相同的行将在同一个分区中。

      这似乎证实了我的经验观察。

      【讨论】:

        猜你喜欢
        • 2017-01-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-04-19
        • 1970-01-01
        • 2012-08-26
        • 1970-01-01
        相关资源
        最近更新 更多