【问题标题】:Difference between `join` and `union` followed by `groupByKey` in Spark?Spark中`join`和`union`后跟`groupByKey`的区别?
【发布时间】:2016-03-18 21:12:48
【问题描述】:

我找不到很好的理由:

anRDD.join(anotherRDD)

应该不同于:

anRDD.union(anotherRDD).groupByKey()

但是,后者给了我一个错误,而前者没有。如果绝对需要,我可以提供一个示例,但我想从功能抽象的角度了解。我问过的没有人可以给我一个很好的解释。

【问题讨论】:

  • 恕我直言,提供数据示例总是更好。因为,加入交易对

标签: join apache-spark group-by union pyspark


【解决方案1】:

以下几点我会用下面的代码来说明:

  • join 与两个 rdds 一起工作,每个 rdds 由对组成,并且具有需要匹配的相同密钥。两个rdds的值类型不需要匹配。生成的 rdd 将始终具有 (Key, (Value1, Value2)) 类型的条目
  • 如果anRDDanotherRDD 具有不同类型的值,anRDD.union(anotherRDD).groupByKey() 将产生错误;如果键和值都具有相同的类型,则不会产生错误。结果将是类型为 (Key, Iterable[Value]) 的条目,其中 Iterable 不需要像连接的情况那样具有长度 2。

例子:

val rdd1 = sc.parallelize(Seq(  ("a", 1) , ("b", 1)))
val rdd2 = sc.parallelize(Seq(  ("a", 2) , ("b", 2)))
val rdd3 = sc.parallelize(Seq(  ("a", 2.0) , ("b", 2.0))) // different Value type
val rdd4 = sc.parallelize(Seq(  ("a", 1) , ("b", 1), ("a", 5) , ("b", 5)))
val rdd5 = sc.parallelize(Seq(  ("a", 2) , ("b", 2), ("a", 5) , ("b", 5)))

产生以下内容:

scala> rdd1.join(rdd2)
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[77] at join at <console>:26

scala> rdd1.union(rdd2).groupByKey
res19: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[79] at groupByKey at <console>:26

scala> rdd1.union(rdd3).groupByKey
<console>:26: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Double)]
 required: org.apache.spark.rdd.RDD[(String, Int)]
              rdd1.union(rdd3).groupByKey

如果您的 rdds 中有重复的键,请注意会产生不同的结果:

scala> rdd4.union(rdd5).groupByKey.collect.mkString("\n")
res21: String = 
(a,CompactBuffer(1, 5, 2, 5))
(b,CompactBuffer(1, 5, 2, 5))

scala> rdd4.join(rdd5).collect.mkString("\n")
res22: String = 
(a,(1,2))
(a,(1,5))
(a,(5,2))
(a,(5,5))
(b,(1,2))
(b,(1,5))
(b,(5,2))
(b,(5,5))

编辑:OP 使用的是 Python,而不是 Scala。 Python 和 Scala 在类型安全方面存在差异。如上图所示,Scala 将捕获两个 RDD 之间的类型不匹配等问题; Python 不会立即捕获它,但稍后当您尝试对错误类型的对象应用方法时会产生神秘错误。请记住,Spark 是用 Scala 和 Python API 编写的。

确实,我在评论中尝试了 OP 代码,并且在 pyspark 中,它适用于像 count() 这样的简单操作。但是,如果您例如尝试对每个值求平方(您可以对整数执行此操作,但不能对字符串执行此操作),则会产生错误

这是数据:注意我省略了列表,我只有值 1 和 0。

B = [('b',1), ('c',0)]
C = [('b', 'bs'), ('c', 'cs')]
anRDD = sc.parallelize(B)
anotherRDD = sc.parallelize(C)

这是输出:

>>> anRDD.join(anotherRDD).count()
2
>>> anRDD.union(anotherRDD).groupByKey().count()
2
>>> for y in anRDD.map(lambda (a, x): (a, x*x)).collect():
...   print y
... 
('b', 1)
('c', 0)
>>> for y in anRDD.union(anotherRDD).map(lambda (a, x): (a, x*x)).collect():
...   print y
... 
15/12/13 15:18:51 ERROR Executor: Exception in task 5.0 in stage 23.0 (TID 169)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):

【讨论】:

  • 我尝试了 uniongroupByKey 与两个 RDD,每个 RDD 都有不同的值类型,它不会给我一个错误:B = (('b',[1]), ('c',[0]))C = (('b',['bs']), ('c',['cs']))anRDD = sc.parallelize(B)anotherRDD = sc.parallelize(C)B_C_unionRDD = anRDD.union(anotherRDD).groupByKey()。它不会引发错误。
  • 我理解你关于 join 是一个元组的结果以及 union-groupByKey 是一个可迭代的结果的观点。但即使我将可迭代对象放入listlist(&lt;iterable&gt;),似乎也不应该有任何区别。
  • 我编辑了关于 Scala 和 Python 之间类型安全差异的答案。如果觉得有用,请考虑点赞。
【解决方案2】:

前者和后者有不同的结果集:

  • 以前的:

    (K, V).join(K, W) = (K, (V, W))
    

    前者的结果是equi-join,SQL类比:

    anRDD.K = anotherRDD.K
    
  • 后者:

    不仅包括 equi-join 结果,还包括 anRDD 中的非匹配部分和 anotherRDD 中的非匹配部分。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-10-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多