【发布时间】:2025-11-27 19:10:01
【问题描述】:
我正在尝试将映射列表 (Seq[Map[String, Map[String, String]]) 转换为 RDD 表/元组,其中映射中的每个键 -> 值对都平面映射到元组使用外部地图的密钥。例如
Map(
1 -> Map('k' -> 'v', 'k1' -> 'v1')
)
变成
(1, 'k', 'v')
(1, 'k1', 'v1')
我尝试了以下方法,但似乎在并发问题上失败了。我有两个工作节点,它复制了两次 key -> value(我认为这是因为我做错了)
假设我将地图类型保存在案例类“记录”中
val rdd = sc.parallelize(1 to records.length)
val recordsIt = records.iterator
val res: RDD[(String, String, String)] = rdd.flatMap(f => {
val currItem = recordsIt.next()
val x: immutable.Iterable[(String, String, String)] = currItem.mapData.map(v => {
(currItem.identifier, v._1, v._2)
})
x
}).sortBy(r => r)
有没有办法在不遇到严重并发问题的情况下并行化这项工作(我怀疑正在发生?
示例重复输出
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
【问题讨论】:
标签: scala apache-spark hadoop apache-spark-sql rdd