【问题标题】:Joining a large and a massive spark dataframe加入一个庞大的火花数据框
【发布时间】:2018-09-30 16:06:25
【问题描述】:

我的问题如下:

  • 我有一个名为 details 的大型数据框,其中包含 900K 行,另一个包含 80M 行,名为 attributes

  • 两者都有一个列A,我想在该列上进行左外连接,左侧数据框为deatils

  • 在数据框 details 的列 A 中只有 75K 唯一条目。数据框attributes 80M 列A 中的唯一条目。

实现join 操作的最佳方法是什么?

我尝试了什么?

  • 简单的连接,即details.join(attributes, "A", how="left_outer") 只是超时(或内存不足)。

  • 由于details 中的A 列中只有75K 唯一条目,我们不关心attributes 中数据框中的其余部分。因此,首先我使用以下过滤器进行过滤:

    uniqueA = details.select('A').distinct().collect()
    uniqueA = map(lambda x: x.A, uniqueA)
    attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
    

    我认为这会成功,因为 attributes 表从 80M 行减少到仅 75K 行。但是,完成join 仍然需要很长时间(而且永远不会完成)。

  • 接下来,我以为分区太多了,要join的数据不在同一个分区上。虽然,我不知道如何将所有数据带到同一个分区,但我认为重新分区可能会有所帮助。就这样吧。

    details_repartitioned = details.repartition("A")
    attributes_repartitioned = attributes.repartition("A")
    
  • 上述操作将attributes中的分区数从70K减少到200个。details中的分区数约为1100。

    details_attributes = details_repartitioned.join(broadcast(
    attributes_repartitioned), "A", how='left_outer')  # tried without broadcast too
    

毕竟,join 仍然不起作用。我仍在学习 PySpark,所以我可能误解了重新分区背后的基本原理。如果有人能阐明这一点,那就太好了。

附:我已经看过this 的问题,但这并不能回答这个问题。

【问题讨论】:

    标签: python apache-spark dataframe pyspark bigdata


    【解决方案1】:

    详细信息表在 A 列中有 900k 个项目,其中有 75k 个不同的条目。我认为您尝试过的 A 列上的过滤器是正确的方向。但是,collect 和随后的map 操作

    attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA)) 
    

    这太贵了。另一种方法是

    uniqueA = details.select('A').distinct().persist(StorageLevel.DISK_ONLY)
    uniqueA.count // Breaking the DAG lineage
    attrJoined = attributes.join(uniqueA, "inner")
    

    另外,如果您还没有正确设置随机分区,您可能需要正确设置。

    您的数据集中可能会出现一个问题,那就是偏斜。它可能发生在 75k 个唯一值中,只有少数几个与属性表中的大量行连接。在这种情况下,连接可能需要更长的时间并且可能无法完成。

    要解决这个问题,您需要找到 A 列的倾斜值并单独处理它们。

    【讨论】:

    • 您能否详细说明 是什么意思以及如何做到这一点?
    • 您可以在进行 spark-submit 时执行此操作。你可以做 --conf spark.sql.shuffle.partitions=5000 或者你可以从 spark sql 设置它。 spark.sql("设置 spark.sql.shuffle.partitions=5000")
    • 谢谢。有用。可能真正的技巧是将分区设置为 5000。现在,我想看看过滤将 77M 降低到 70K 是否真的有必要。还有一个问题是,要打破 DAG 血统,做uniqueA.count 是否足够,还是必须调用诸如uniqueA.count() 之类的计数?
    • 是的,打破 DAG 是最重要的。要破坏 DAG,您需要对 Dataframe 执行操作,例如计数、收集、保存等。因为除了操作之外,spark 中的所有操作都是惰性的。我发现计数比显式保存更容易。不确定是否有任何替代方法可以实现这一目标
    • 可能是因为如果一个长 DAG 底部的阶段失败了; spark 需要重新计算整个 DAG。这会产生后续故障并最终导致作业失败。但是,如果我们破坏 DAG ,任何阶段的失败都不需要完全重新计算。这可能是一个原因,尽管我不确定。
    猜你喜欢
    • 2016-06-27
    • 1970-01-01
    • 2019-07-07
    • 2021-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-22
    • 1970-01-01
    相关资源
    最近更新 更多