【问题标题】:Spark DataFrame partitionSpark DataFrame 分区
【发布时间】:2020-09-08 07:04:59
【问题描述】:

目前,我有一个数据框。我想把它们分成几个独立的dataframe,然后依次处理。

像这样的 spark 数据名:

+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|            id|data_identifier_method|       start_time|         end_time|time_interval|             time|    value|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+

那我想把它分成四个数据框:

+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|            id|data_identifier_method|       start_time|         end_time|time_interval|             time|    value|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|            id|data_identifier_method|       start_time|         end_time|time_interval|             time|    value|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|fd78sfsdfsd8vs|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|            id|data_identifier_method|       start_time|         end_time|time_interval|             time|    value|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|  fd784213423f|  algid1_set1_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|            id|data_identifier_method|       start_time|         end_time|time_interval|             time|    value|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:00|342342.12|
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:05|342421.88|
|  fd784213423f|  algid2_set2_total...|20200903 00:00:00|20200903 00:00:10|            5|20200903 00:00:10|351232.92|
+--------------+----------------------+-----------------+-----------------+-------------+-----------------+---------+

我该怎么办?

也就是说,如果不分割原始dataframe,如何对原始dataframe中的这四项进行操作?

【问题讨论】:

  • filter怎么样?
  • 怎么做?
  • 我建议您阅读有关基本数据帧操作的众多现有教程之一。比如这个:docs.databricks.com/spark/latest/dataframes-datasets/…
  • 如果您仍然遇到问题,请返回此处并展示您尝试过的内容以及未按预期工作的内容。

标签: dataframe apache-spark apache-spark-sql


【解决方案1】:

可以使用NTILE

来实现

NTILE 是一个 Spark SQL 分析函数。它将有序数据集划分为由 expr 指示的多个桶,并为每一行分配适当的桶号。桶从 1 到 expr 编号。对于每个分区,expr 值必须解析为一个正常数。

它将有一个分区号(NTILE 值)。现在您可以使用过滤器创建Ntile函数中指定的数据集。

下面是虚拟代码。

val w = Window.orderBy("sum_val")
val resultDF = x.orderBy("sum_val").select( x("id"),x("sum_val"), ntile(4).over(w) )

+---+-------+-----+
| id|sum_val|ntile|
+---+-------+-----+
|  B|      3|    1|
|  E|      4|    1|
|  H|      4|    2|
|  D|     14|    2|
|  A|     14|    3|
|  F|     30|    3|
|  C|     34|    4|
+---+-------+-----+

【讨论】:

    【解决方案2】:

    “我想把它们分成几个独立的dataframe,然后依次处理。”

    您实际上想解决什么问题?有很多方法可以完成类似的事情,具体取决于您的要求,每种方法具有非常不同的内存和性能特征。

    1. 使用表分区。所有输出文件最终都位于相同的文件夹路径中,但从技术上讲,每个分区可以有多个文件。可选择按分区列排序,以最大限度地减少输出文件的数量。

    dataframe.write.partitionBy("partition_col").parquet("s3://bucket/path/")

    1. 使用 HashAggregation 将所有匹配的记录放在同一个分区中,但多个键可以在同一个分区中结束

    dataframe.repartition("partition_col")...

    1. 收集分区列值,然后对每个值运行独立作业。这使您可以将所有内容收集到一个分区中,但这比前两个要慢得多,并且涉及重新读取许多记录并将它们塞入一个分区中,这可能会导致性能问题和内存问题。
    val partitionColVals = dataframe.groupBy("partition_col").map(_.get(0)).collect
    for(val partitionColVal <- partitionColVals){
      dataframe.where(col("partition_col" === lit(partitionColVal)).repartition(1)...
    }
    
    4.  Window functions - this is an SQL concept that allows you todo operations on sets of matching keys.  Kinda like a high powered group by.  This will depend on what you're trying todo.
    

    【讨论】:

      猜你喜欢
      • 2016-03-06
      • 2018-01-19
      • 2017-01-15
      • 1970-01-01
      • 2019-03-02
      • 1970-01-01
      • 1970-01-01
      • 2022-11-12
      • 1970-01-01
      相关资源
      最近更新 更多