【问题标题】:How to access lookup(broadcast) RDD(or dataset) into other RDD map function如何将查找(广播)RDD(或数据集)访问到其他 RDD 映射函数
【发布时间】:2016-04-11 23:20:27
【问题描述】:

我是 spark 和 scala 的新手,刚刚开始学习……我在 CDH 5.1.3 上使用 spark 1.0.0

我得到了一个名为 dbTableKeyValueMap: RDD[(String, String)] 的广播 rdd,我想使用 dbTableKeyValueMap 来处理我的 fileRDD(每行有 300 多列)。这是代码:

val get = fileRDD.map({x =>
  val tmp = dbTableKeyValueMap.lookup(x)
  tmp
})

在本地运行此程序会挂起和/或一段时间后出现错误:

scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

我可以理解访问另一个 RDD 中的一个 RDD 会有问题,如果集合的位置和大小出现。对我来说,采用笛卡尔积不是选项,因为文件 RDD 中的记录很大(每行有 300 多列)。 ..就像我在设置方法中使用分布式缓存加载这个dbTableKeyValueMap并在hadoop java mapreduce代码的MAP中使用一样,我想在spark map中使用类似的方式......我找不到简单的例子来引用类似的用例。 .. 我想逐一迭代 fileRDD 行并在“每一列”上进行一些转换、美化、查找等以进行进一步处理...... 或者还有其他方法可以使用 dbTableKeyValueMap 作为 scala 集合而不是 spark RDD

请帮忙

【问题讨论】:

    标签: scala apache-spark spark-streaming rdd broadcasting


    【解决方案1】:

    谢谢.... 最简单的方法是将查找 RDD 转换为“scala 集合”,一切顺利!我可以使用任何 RDD 在转换中访问它....

    val scalaMap = dbTableKeyValueMap.collectAsMap.toMap
    val broadCastLookupMap = sc.broadcast(scalaMap)
    
    val get = fileRDD.map({x =>
      val tmp = broadCastLookupMap.value.get(x).head
      tmp
    })
    

    这个简单的解决方案应该记录在某个地方供早期学习者使用。我花了一段时间才弄清楚...

    感谢您的帮助...

    【讨论】:

    • 为了使这个解决方案可行,您需要 dbTableKeyValueMap 中的数据适合工作内存。
    【解决方案2】:

    我可以理解访问另一个 RDD 中的一个 RDD 会有问题,如果集合的位置和大小出现在图片中

    不是真的。它根本行不通。 Spark 不支持嵌套操作和转换。这意味着广播的RDD不能用于访问数据。

    通常您有三个选择:

    【讨论】:

      猜你喜欢
      • 2017-01-15
      • 1970-01-01
      • 1970-01-01
      • 2018-07-13
      • 1970-01-01
      • 2017-09-29
      • 1970-01-01
      • 2023-04-02
      • 2018-01-20
      相关资源
      最近更新 更多