【问题标题】:Divide Spark DataFrame rows into multiple rows depending on the size of Map type column根据 Map 类型列的大小将 Spark DataFrame 行分成多行
【发布时间】:2026-02-15 02:05:02
【问题描述】:

Spark DataFrame 结构:

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

attributes 列的类型为 map。它可以包含多个65535 键值对。

我正在将相同的 DataFrame 写入 Cassandra 表。表 DDL 如下:

create table <keyspace>.<table> (
  partition_key text,
  row_key text,
  attributes map<text,text>,
  data_as_of_date text,
  PRIMARY KEY (partition_key, row_key)
  );

问题:
在 Cassandra 中,attributesmap&lt;text, text&gt; 类型列不能包含超过 65535 的键值对。

问题:
如果 DataFrame 映射类型列有超过 65535 个键值对,有人可以帮我使用 Scala-Spark sn-p 将 DataFrame 行分成多行。

例如, 如果 DataFrame 行在 map 类型列中有 163838 个键值对,则该行应分为 3 行。 row_key 将为该行附加_ + Seq No.,以便在将行划分为多行后生成唯一的 Cassandra 复合主键。如果映射类型具有

值如下所示:

<partition_key>, <row_key>_1, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_2, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_3, <32768 key value pairs from the map>, <data_as_of_date>

请使用以下示例数据帧/代码。任何地图大小大于 2 的行都应分成多行。

val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
    )

    val df = spark.createDataFrame(data)

输出的DataFrame应该如下:

"123", "123001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123001_3", Map("key5" -> "value5"), "20210725"
"123", "123002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123002_3", Map("key5" -> "value5"), "20210725"
"123", "123003_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123003_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123003_3", Map("key5" -> "value5"), "20210725"
"456", "456001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456001_3", Map("key5" -> "value5"), "20210725"
"456", "456002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456002_3", Map("key5" -> "value5"), "20210725"
"456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725"

【问题讨论】:

    标签: scala apache-spark cassandra


    【解决方案1】:

    这是一个解决方案,包括:

    1. posexplode 地图列
    2. 整数将位置列分为组列
    3. groupBy 将同一组再次合并到地图中
    4. 连接 row_key 和 group 列

    所以我认为这应该给出预期的结果:

    val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
          (123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
          ("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
          ("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
          ("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
          ("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
        )
    
    val df = data.toDF("partition_key", "row_key", "attributes", "data_as_of_date")
    
    val maxItem = 2
    
    df.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
      .withColumn("group", $"pos".divide(maxItem).cast("int"))
      .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
      .agg(collect_list(map($"key", $"value")).as("attributes"))
      .withColumn("row_key", concat($"row_key", lit("_"), $"group"))
      .select($"partition_key", $"row_key", $"attributes", $"data_as_of_date")
      .show
    

    【讨论】:

    • 谢谢。 DataFrame Row 仅在 map 类型列(属性)的 count 大于 maxItem 时,才需要分成多个 Rows 和 row_key 加上序号。否则,该行不应该有任何变化。例如。行“456|456003|[key1 -> value1, key2 -> value2]|20210725”应该保持原样,而不是更改为“456|456003_0|[key1 -> value1, key2 -> value2]|20210725”。请修改和帮助。
    • 这是因为我的大多数行 (99.99%) 的属性数(映射键值对)都小于 maxItem 阈值,我希望避免对这些行进行额外处理。只有具有属性 count > maxItem 的行才能执行此附加处理。我们能否以某种方式首先检查属性计数并仅在它大于 maxItem 时才进行处理?请帮忙。
    • 抱歉,我没有看到直接的方法来做到这一点,我也没有时间了......另外,也许你不能那样做?我不知道你的目标,但你可以保留原来的 row_key 并将组列重命名为 sub_row_key,然后仅在需要时使用它?
    • 当然。让我尝试使用 df.map 并在 map 函数中编写自定义代码。谢谢您的帮助。我会将您的答案标记为正确。