【问题标题】:Scala Nested Map to Spark RDDScala 嵌套映射到 Spark RDD
【发布时间】:2025-11-27 19:10:01
【问题描述】:

我正在尝试将映射列表 (Seq[Map[String, Map[String, String]]) 转换为 RDD 表/元组,其中映射中的每个键 -> 值对都平面映射到元组使用外部地图的密钥。例如

Map(
 1 -> Map('k' -> 'v', 'k1' -> 'v1')
)  

变成

(1, 'k', 'v')
(1, 'k1', 'v1')

我尝试了以下方法,但似乎在并发问题上失败了。我有两个工作节点,它复制了两次 key -> value(我认为这是因为我做错了)

假设我将地图类型保存在案例类“记录”中

  val rdd = sc.parallelize(1 to records.length)
    val recordsIt = records.iterator
      val res: RDD[(String, String, String)] = rdd.flatMap(f => {
        val currItem = recordsIt.next()
        val x: immutable.Iterable[(String, String, String)] = currItem.mapData.map(v => {
          (currItem.identifier, v._1, v._2)
        })
        x
      }).sortBy(r => r)

有没有办法在不遇到严重并发问题的情况下并行化这项工作(我怀疑正在发生?

示例重复输出

(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)

【问题讨论】:

    标签: scala apache-spark hadoop apache-spark-sql rdd


    【解决方案1】:

    Spark parallelize 从一开始就非常高效(因为您已经开始将数据存储在内存中,因此仅在本地迭代它的成本要低得多),但更惯用的方法是简单的flatMap

    sc.parallelize(records.toSeq)
      .flatMapValues(identity)
      .map { case (k1, (k2, v)) => (k1, k2, v) } 
    

    【讨论】:

    • 我同意它已经是本地的,但是有数亿行,它更快、更具可扩展性 - 并且惯用 :)