【问题标题】:Is Spark zipWithIndex safe with parallel implementation?Spark zipWithIndex 在并行实现中是否安全?
【发布时间】:2015-08-06 03:16:37
【问题描述】:

如果我有一个文件,并且我每行做了一个 RDD zipWithIndex,

([row1, id1001, name, address], 0)
([row2, id1001, name, address], 1)
...
([row100000, id1001, name, address], 100000)

如果我重新加载文件,我能否获得相同的索引顺序?既然是并行运行的,其他行可能分区不同?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    RDDs 可以排序,所以也有顺序。此顺序用于创建带有.zipWithIndex() 的索引。

    每次都获得相同的顺序取决于之前的调用在您的程序中执行的操作。文档提到.groupBy() 可以破坏订单或生成不同的订单。可能还有其他调用也可以执行此操作。

    如果您需要保证特定的顺序,我想您总是可以在致电.zipWithIndex() 之前致电.sortBy()

    这在.zipWithIndex() scala API docs中有解释

    public RDD<scala.Tuple2<T,Object>> zipWithIndex() 压缩这个 RDD 它的元素索引。排序首先基于分区 索引,然后是每个分区中项目的排序。所以 第一个分区中的第一项获取索引 0,最后一项 最后一个分区接收最大的索引。这类似于 Scala 的 zipWithIndex 但它使用 Long 而不是 Int 作为索引 类型。此方法需要在此 RDD 包含时触发 spark 作业 多个分区。

    请注意,某些 RDD,例如由 groupBy() 返回的 RDD,不 保证分区中元素的顺序。分配给每个的索引 因此,不保证元素,如果 RDD 为 重新评估。如果需要固定订购以保证相同 索引分配,您应该使用 sortByKey() 对 RDD 进行排序或保存 到一个文件。

    【讨论】:

    • 在 RDD 上使用 sortBy 会将其收集到驱动程序中,对吗?恐怕这可能会导致OOME。我想要的排序顺序只是文件中行的默认顺序。
    • @sophie 排序是在工作人员中完成的,而不是在驱动程序中完成的。如果在阅读 API 文档后,您不确定会发生什么,那么您应该通过运行它几次来测试它,并在某些索引号处抽查元素。您可以在不将所有数据加载到驱动程序的情况下执行此操作,方法是将 .filter() 与匿名函数一起使用,当行号与某些特定行(如第 43 行)匹配时产生 true ,然后使用 .take(1) 到将那条数据带给司机。
    猜你喜欢
    • 2015-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-08
    • 1970-01-01
    • 1970-01-01
    • 2014-04-27
    • 2020-03-07
    相关资源
    最近更新 更多