【问题标题】:search rdd for value from another rdd从另一个 rdd 中搜索 rdd 的值
【发布时间】:2015-08-07 05:01:06
【问题描述】:

我正在使用 Spark + Scala。我的 rdd1 有客户信息,即 (id, [name, address])。 rdd2 只有知名客户的名字。现在我想查找 rdd1 中的客户是否高调。如何使用另一个搜索一个 rdd?加入 rdd 对我来说似乎不是一个好的解决方案。

我的代码:

val result = rdd1.map( case (id, customer) => 
  customer.foreach ( c => 
    rdd2.filter(_ == c._1).count()!=0 ))

错误org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;

【问题讨论】:

  • "加入 rdd 对我来说并不是一个好的解决方案。"为什么不呢?
  • 因为 rdd 没有公共密钥,内部连接不会让已经很大的 rdd 变得超级庞大?
  • 谢谢保罗。我不得不刷新我的加入知识。对内连接和外连接感到困惑。

标签: scala apache-spark rdd


【解决方案1】:

你必须通过收集来广播一个rdd。您可以广播较小的 rdd 以提高性能。

val bcastRdd = sc.broadcast(rdd2.collect)
rdd1.map(
   case (id, customer) => customer.foreach(c => 
        bcastRdd.value.filter(_ == c._1).count()!=0))

【讨论】:

  • 广播在这个例子中没有任何区别。闭包捕获的变量无论如何都会被广播。当您想在多个阶段使用某些东西时,您只需要显式广播。
【解决方案2】:

您可以使用左外连接,以避免进行昂贵的操作,例如收集(如果您的 RDD 很大)

正如丹尼尔所指出的,广播不是必需的。

这是一个 sn-p,它可以帮助获得带有标志的 RDD1,该标志表示他是高调客户或低调客户。

val highProfileFlag = 1
val lowProfileFlag = 0 

// Keying rdd 1 by the name    
val rdd1Keyed = rdd1.map { case (id, (name, address)) => (name, (id, address)) }

// Keying rdd 2 by the name and adding a high profile flag
val rdd2Keyed = rdd2.map { case name => (name, highProfileFlag) }

// The join you are looking for is the left outer join
val rdd1HighProfileFlag = rdd1Keyed
.leftOuterJoin(rdd2Keyed)
.map { case (name, (id, address), highProfileString) => 
      val profileFlag = highProfileString.getOrElse(lowProfileFlag) 
      (id , (name, address, profileFlag))
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-23
    • 1970-01-01
    • 2016-12-16
    • 2016-12-23
    • 2016-03-21
    • 1970-01-01
    相关资源
    最近更新 更多