【发布时间】:2026-02-15 02:05:02
【问题描述】:
Spark DataFrame 结构:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
attributes 列的类型为 map。它可以包含多个65535 键值对。
我正在将相同的 DataFrame 写入 Cassandra 表。表 DDL 如下:
create table <keyspace>.<table> (
partition_key text,
row_key text,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key)
);
问题:
在 Cassandra 中,attributes 的 map<text, text> 类型列不能包含超过 65535 的键值对。
问题:
如果 DataFrame 映射类型列有超过 65535 个键值对,有人可以帮我使用 Scala-Spark sn-p 将 DataFrame 行分成多行。
例如,
如果 DataFrame 行在 map 类型列中有 163838 个键值对,则该行应分为 3 行。 row_key 将为该行附加_ + Seq No.,以便在将行划分为多行后生成唯一的 Cassandra 复合主键。如果映射类型具有
值如下所示:
<partition_key>, <row_key>_1, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_2, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_3, <32768 key value pairs from the map>, <data_as_of_date>
请使用以下示例数据帧/代码。任何地图大小大于 2 的行都应分成多行。
val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
)
val df = spark.createDataFrame(data)
输出的DataFrame应该如下:
"123", "123001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123001_3", Map("key5" -> "value5"), "20210725"
"123", "123002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123002_3", Map("key5" -> "value5"), "20210725"
"123", "123003_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123003_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123003_3", Map("key5" -> "value5"), "20210725"
"456", "456001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456001_3", Map("key5" -> "value5"), "20210725"
"456", "456002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456002_3", Map("key5" -> "value5"), "20210725"
"456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
【问题讨论】:
标签: scala apache-spark cassandra