以下几点我会用下面的代码来说明:
-
join 与两个 rdds 一起工作,每个 rdds 由对组成,并且具有需要匹配的相同密钥。两个rdds的值类型不需要匹配。生成的 rdd 将始终具有 (Key, (Value1, Value2)) 类型的条目
-
如果
anRDD 和anotherRDD 具有不同类型的值,anRDD.union(anotherRDD).groupByKey() 将产生错误;如果键和值都具有相同的类型,则不会产生错误。结果将是类型为 (Key, Iterable[Value]) 的条目,其中 Iterable 不需要像连接的情况那样具有长度 2。
例子:
val rdd1 = sc.parallelize(Seq( ("a", 1) , ("b", 1)))
val rdd2 = sc.parallelize(Seq( ("a", 2) , ("b", 2)))
val rdd3 = sc.parallelize(Seq( ("a", 2.0) , ("b", 2.0))) // different Value type
val rdd4 = sc.parallelize(Seq( ("a", 1) , ("b", 1), ("a", 5) , ("b", 5)))
val rdd5 = sc.parallelize(Seq( ("a", 2) , ("b", 2), ("a", 5) , ("b", 5)))
产生以下内容:
scala> rdd1.join(rdd2)
res18: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[77] at join at <console>:26
scala> rdd1.union(rdd2).groupByKey
res19: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[79] at groupByKey at <console>:26
scala> rdd1.union(rdd3).groupByKey
<console>:26: error: type mismatch;
found : org.apache.spark.rdd.RDD[(String, Double)]
required: org.apache.spark.rdd.RDD[(String, Int)]
rdd1.union(rdd3).groupByKey
如果您的 rdds 中有重复的键,请注意会产生不同的结果:
scala> rdd4.union(rdd5).groupByKey.collect.mkString("\n")
res21: String =
(a,CompactBuffer(1, 5, 2, 5))
(b,CompactBuffer(1, 5, 2, 5))
scala> rdd4.join(rdd5).collect.mkString("\n")
res22: String =
(a,(1,2))
(a,(1,5))
(a,(5,2))
(a,(5,5))
(b,(1,2))
(b,(1,5))
(b,(5,2))
(b,(5,5))
编辑:OP 使用的是 Python,而不是 Scala。 Python 和 Scala 在类型安全方面存在差异。如上图所示,Scala 将捕获两个 RDD 之间的类型不匹配等问题; Python 不会立即捕获它,但稍后当您尝试对错误类型的对象应用方法时会产生神秘错误。请记住,Spark 是用 Scala 和 Python API 编写的。
确实,我在评论中尝试了 OP 代码,并且在 pyspark 中,它适用于像 count() 这样的简单操作。但是,如果您例如尝试对每个值求平方(您可以对整数执行此操作,但不能对字符串执行此操作),则会产生错误
这是数据:注意我省略了列表,我只有值 1 和 0。
B = [('b',1), ('c',0)]
C = [('b', 'bs'), ('c', 'cs')]
anRDD = sc.parallelize(B)
anotherRDD = sc.parallelize(C)
这是输出:
>>> anRDD.join(anotherRDD).count()
2
>>> anRDD.union(anotherRDD).groupByKey().count()
2
>>> for y in anRDD.map(lambda (a, x): (a, x*x)).collect():
... print y
...
('b', 1)
('c', 0)
>>> for y in anRDD.union(anotherRDD).map(lambda (a, x): (a, x*x)).collect():
... print y
...
15/12/13 15:18:51 ERROR Executor: Exception in task 5.0 in stage 23.0 (TID 169)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):