【问题标题】:spark sql dataframe join with renaming in a loop在循环中重命名火花 sql 数据帧连接
【发布时间】:2016-05-06 14:00:26
【问题描述】:

我正在尝试对数据帧进行传递闭包。经过几次迭代后,我得到了一些内部火花异常。关于导致它的原因以及如何解决它的任何想法。这是我的程序:

val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19))
var edges = e.map(p => Edge(p._1, p._2)).toDF()
var filtered = edges
  .filter("start = 1")
  .distinct()
  .withColumnRenamed("start", "fStart")
  .withColumnRenamed("end", "fEnd")

var i = 0
while (i < 30) {
  i = i + 1
  println("\n i = " + i)
  filtered = filtered
    .join(edges, filtered("fEnd") === edges("start"))
    .select(filtered("fStart"), edges("end"))
    .withColumnRenamed("start", "fStart")
    .withColumnRenamed("end", "fEnd").distinct
  filtered.show
}

它需要在顶层定义一个简单的案例类:

case class Edge(start: Int, end: Int)

这是异常的输出,之后火花挂起一段时间,然后退出并出现错误Executor heartbeat timed out

 i = 1
+------+----+
|fStart|fEnd|
+------+----+
|     1|   4|
+------+----+


 i = 2
+------+----+
|fStart|fEnd|
+------+----+
|     1|   5|
+------+----+


 i = 3
+------+----+
|fStart|fEnd|
+------+----+
|     1|   6|
+------+----+
...

 i = 10
+------+----+
|fStart|fEnd|
+------+----+
|     1|  13|
+------+----+


 i = 11
16/01/29 00:28:59 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field    org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 32 more
+------+----+
|fStart|fEnd|
+------+----+
|     1|  14|
+------+----+
...

PS1。如何在不重命名列的情况下完成这种连接? PS2。还有一些关于以这种方式使用数据框的文档吗? API 文档非常简洁。

【问题讨论】:

  • 尝试在每次迭代结束时添加缓存:filtered.cache(),然后再使用show

标签: sql scala join apache-spark dataframe


【解决方案1】:

这些错误似乎只有在集群的资源不足以满足请求并且积压增加并且在一段时间后出现这些错误时才会出现。

要解决您的问题,请在 filtered.show 之前添加 filtered.cache

同样在第 16 次迭代之后将没有结果,因为不会有 filtered.fEnd === edges.start 的匹配项。

【讨论】:

  • 确实添加了filtered.cache 帮助(谢谢)。我知道,经过一些迭代,程序循环而不做任何事情,只是为了显示这个错误。我不明白的是资源是如何缺乏的。这是一个非常简单的示例,其中包含一些 ONE ROW 数据帧和几个空的数据帧。还有为什么在这里做缓存有帮助?
  • 在我添加filtered.cache 并运行循环进行更多迭代之后,我可以看到每次迭代需要更长的时间。这是为什么?过了一会儿,它们产生了相同的空数据框。 PS。 @Sumit 你能指点我一些文档或 youtube 视频吗?
  • DAG 可以指导您确切的原因,但似乎随着迭代的每次增加,它也会处理以前的迭代。例如i=2,它将处理i=1的数据,然后处理i=2的数据。请记住,Spark 持久化需要应用于数据(数据沿袭)的转换,而不是转换的结果/输出。要持久化输出,您特别需要调用 RDD.cache
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-21
  • 1970-01-01
  • 2023-02-16
  • 2018-11-08
  • 2019-08-18
  • 1970-01-01
相关资源
最近更新 更多