【问题标题】:Checking if an RDD element is in another using the map function使用 map 函数检查一个 RDD 元素是否在另一个元素中
【发布时间】:2016-06-02 08:26:49
【问题描述】:

我是 Spark 的新手,想知道关于闭包的事情。
我有两个 RDD,一个包含一个 ID 列表和一个值,另一个包含一个选定 ID 的列表。
使用地图,我想增加元素的值,如果另一个RDD包含它的ID,就像这样。

val ids = sc.parallelize(List(1,2,10,5))
val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
vals.map( v => {
  if(ids.collect().contains(v._1)){
    (v._1, 1)
  } 
 })

但是,作业挂起并且永远不会完成。 这样做的正确方法是什么, 感谢您的帮助!

【问题讨论】:

    标签: scala apache-spark closures


    【解决方案1】:

    您的实现尝试在用于映射另一个的闭包内使用一个 RDD (ids) - 这在 Spark 应用程序中是不允许的:闭包中使用的任何内容都必须是可序列化(最好是小),因为它将被序列化并发送给每个工人。

    这些 RDD 之间的 leftOuterJoin 应该可以满足您的需求:

    val ids = sc.parallelize(List(1,2,10,5))
    val vals = sc.parallelize(List((1, 0), (2, 0), (3,0), (4,0)))
    val result = vals
            .leftOuterJoin(ids.keyBy(i => i))
            .mapValues({ 
                case (v, Some(matchingId)) => v + 1  // increase value if match found
                case (v, None) => v                  // leave value as-is otherwise
            }) 
    

    leftOuterJoin 需要两个键值 RDD,因此我们使用标识函数人为地从 ids RDD 中提取一个键。然后我们将每个结果 (id: Int, (value: Int, matchingId: Option[Int])) 记录的映射到 v 或 v+1。

    通常,在使用 Spark 时,您应始终尽量减少使用 collect 等操作,因为此类操作会将数据从分布式集群移回驱动程序应用程序。

    【讨论】:

    • 感谢您的详细解答!
    猜你喜欢
    • 2016-06-29
    • 2020-12-03
    • 2019-02-12
    • 1970-01-01
    • 1970-01-01
    • 2016-12-31
    • 1970-01-01
    • 2017-08-04
    • 1970-01-01
    相关资源
    最近更新 更多