【发布时间】:2016-05-05 07:30:02
【问题描述】:
我无法让 Sparks DataFrame join 工作(没有产生结果)。这是我的代码:
val e = Seq((1, 2), (1, 3), (2, 4))
var edges = e.map(p => Edge(p._1, p._2)).toDF()
var filtered = edges.filter("start = 1").distinct()
println("filtered")
filtered.show()
filtered.printSchema()
println("edges")
edges.show()
edges.printSchema()
var joined = filtered.join(edges, filtered("end") === edges("start"))//.select(filtered("start"), edges("end"))
println("joined")
joined.show()
它需要在顶层定义case class Edge(start: Int, end: Int)。这是它产生的输出:
filtered
+-----+---+
|start|end|
+-----+---+
| 1| 2|
| 1| 3|
+-----+---+
root
|-- start: integer (nullable = false)
|-- end: integer (nullable = false)
edges
+-----+---+
|start|end|
+-----+---+
| 1| 2|
| 1| 3|
| 2| 4|
+-----+---+
root
|-- start: integer (nullable = false)
|-- end: integer (nullable = false)
joined
+-----+---+-----+---+
|start|end|start|end|
+-----+---+-----+---+
+-----+---+-----+---+
我不明白为什么输出是空的。为什么filtered 的第一行没有与edges 的最后一行合并?
【问题讨论】:
标签: scala apache-spark apache-spark-sql