【问题标题】:Spark: Selecting only specific partitionsSpark:仅选择特定分区
【发布时间】:2018-03-24 09:37:25
【问题描述】:

我的 RDBMS 数据库中有一个巨大的表,其中包含不同帐户类型的记录。 我将这些数据加载到 spark 一次,然后会根据帐户类型在此表上循环几次以生成一些报告。

我已在此表上创建了一个临时全局视图。

df.createOrReplaceGlobalTempView("account_tbl")

现在,我想根据 account_type 列将这个视图划分为多个分区,其中数据被分成块,所以每次我在这个视图上循环一个 account_type 时,我只想选择那个特定的分区。

在该特定列上重新分区可以轻松解决这个问题吗?如果是,我是否需要重新分区 df 然后创建一个全局临时视图,或者我不确定。 而且,我如何确保每次循环时只选择那个特定的分区。 请指教。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe apache-spark-dataset


    【解决方案1】:

    例如,您可以使用df.repartition(partition_size, col("account_type"))。在这里,我设置了分区大小和要分区的列。否则,如果您想使用 Spark SQL,您可以使用:

    SET spark.sql.shuffle.partitions = partition_size
    SELECT * FROM df CLUSTER BY account_type
    

    CLUSTER BY 与重新分区类似,但它也会对您的数据框进行排序。

    使用与下一个类似的代码访问每个分区内的数据:

    df.foreachPartition {
          p => /*your code goes here*/
    }
    

    您可以在其中进行所有计算并生成所需的报告。

    要估计分区大小,您可以从默认值开始,例如:200,如果在 shuffle 期间出现 Out Of Memory 异常,您可以增加分区数,例如 1024,直到您的作业成功执行。没有标准方法来计算分区的确切数量,因为它取决于几个因素,例如集群的大小(可用内核、内存)和数据大小。

    此外,由于 RDD 在集群中随机分布,因此无法确定您的分区将包含哪些数据。确定这一点的唯一方法是在 foreachPartition 中使用自定义过滤器。例如,您可以将自定义条件应用为下一个:

    df.foreachPartition( iter => {
            iter.foreach { i =>
              if(i.some_column == "somevalue")
                //write populate data
    
            }
        })
    

    祝你好运

    【讨论】:

    • @@Alexandros,感谢您的回复。只是几件事,我可以做 SparkSQL 或重新分区。 1.无论哪种情况,我如何确定partition_size? 2. 如何验证只选择了那个分区? 3. 重新分区后,我还需要加入其他数据框。所以我不知道如何继续使用 foreachPartition 选项?请指教
    • @@Alexandros,感谢您的及时回复。赞成:)
    • 谢谢@1pluszara :) 我很想知道这是否适合你。如果你成功了,请告诉我
    猜你喜欢
    • 2012-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-04
    • 2018-09-17
    • 1970-01-01
    • 2022-11-09
    • 2019-01-12
    相关资源
    最近更新 更多