【问题标题】:StackOverflowError when doing iterative computing using Apache-Spark使用 Apache-Spark 进行迭代计算时出现 StackOverflowError
【发布时间】:2016-02-27 11:19:11
【问题描述】:

如果一个 RDD 对象有非空的.dependencies,这是否意味着它有血缘关系?我怎样才能删除它?

我正在进行迭代计算,每次迭代都取决于上一次迭代中计算的结果。多次迭代后,会抛出StackOverflowError

起初我尝试使用cache,我阅读了pregel.scala 中的代码,这是GraphX 的一部分,他们使用count 方法在cache 之后实现对象,但我附加了一个调试器,似乎这种方法不会为空.dependencies,这在我的代码中也不起作用。

另一种替代方法是使用checkpoint,我为Graph 对象尝试了checkpoint 顶点和边,然后通过count 顶点和边实现它。然后我使用.isCheckpointed 来检查它是否正确设置了检查点,但它总是返回false。

更新 我编写了一个可以重现问题的简化版本的代码。

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HDTM")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
    val sc = new SparkContext(conf)

    val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
    val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
    val newGraph = Graph(v, e)
    var currentGraph = newGraph
    val vertexIds = currentGraph.vertices.map(_._1).collect()

    for (i <- 1 to 1000) {
      var g = currentGraph
      vertexIds.toStream.foreach(id => {
        g = Graph(currentGraph.vertices, currentGraph.edges)
        g.cache()
        g.edges.cache()
        g.vertices.cache()
        g.vertices.count()
        g.edges.count()
      })

      currentGraph.unpersistVertices(blocking =  false)
      currentGraph.edges.unpersist(blocking = false)
      currentGraph = g
      println(" iter "+i+" finished")
    }

  }

更新

这是代码,我删除了大部分不必要的方法,因此代码行最小化,但如果你考虑它的功能,它可能没有意义。

object StackOverFlow {
  final val PATH = "./"

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("HDTM")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
    val sc = new SparkContext(conf)
    val filePath = PATH + "src/test/resources/binary.txt"
    val wikiGraph: Graph[WikiDataVertex, Double] = WikiGraphLoader.loadGraphFromTestHDTMFile(sc, filePath)
    wikiGraph.cache()
    val root = 0L
    val bfsGraph = GraphAlgorithm.initializeGraph(wikiGraph, root, sc)
    bfsGraph.cache()
    val vertexIds = bfsGraph.vertices.map(_._1).collect()
    var currentGraph = bfsGraph

    for (i <- 1 to 1000) {
      var g = currentGraph
      vertexIds.toStream.foreach(id => {
          g = samplePath(g, id, root)
      })

      currentGraph.unpersistVertices(blocking =  false)
      currentGraph.edges.unpersist(blocking = false)
      currentGraph = g
      println(" iter "+i+" finished")
    }

  }

  def samplePath[ED: ClassTag](graph: Graph[WikiDataVertex, ED],
                               instance: VertexId, root: VertexId): Graph[WikiDataVertex, ED] = {

    if(instance == 0L) return graph

    val (removedGraph, remainedGraph) = splitGraph(graph, instance)

    /**
     * Here I omit some other code, which will change the attributes of removedGraph and remainedGraph
     */

    val newVertices = graph.outerJoinVertices(removedGraph.vertices ++ remainedGraph.vertices)({
      (vid, vd, opt) => {
        opt.getOrElse(vd)
      }
    }).vertices

    val newEdges = graph.edges.map(edge => {
      if (edge.dstId == instance)
        edge.copy(srcId = edge.srcId) 
        // In the real case edge.srcId will be replaced by an vertexId calculated by other functions
      else
        edge.copy()
    })

    val g = Graph(newVertices, newEdges)
    g.vertices.cache()
    g.edges.cache()
    g.cache()
    g.vertices.count()
    g.edges.count()

    remainedGraph.unpersistVertices(blocking = false)
    remainedGraph.edges.unpersist(blocking = false)
    removedGraph.unpersistVertices(blocking = false)
    removedGraph.edges.unpersist(blocking = false)

    g
  }

  /**
   * Split a graph into two sub-graph by an vertex `instance`
   * The edge that ends at `instance` will lose
   * @param graph Graph that will be separated
   * @param instance Vertex that we are using to separate the graph
   * @tparam ED Edge type
   * @return (sub-graph with `instance`, sub-graph without `instance`)
   **/
  def splitGraph[ED: ClassTag]
  (graph: Graph[WikiDataVertex, ED], instance: VertexId): (Graph[WikiDataVertex, ED], Graph[WikiDataVertex,ED]) = {
    val nGraph = GraphAlgorithm.graphWithOutDegree(graph)
    // This will need twice, cache it to prevent re-computation
    nGraph.cache()

    val wGraph = nGraph.subgraph(epred = e => e.dstAttr._1.path.contains(instance) ||
      e.srcAttr._1.path.contains(instance),
      vpred = (id, vd) => vd._1.path.contains(instance))

    val woGraph = nGraph.subgraph(epred = e => !e.dstAttr._1.path.contains(instance) &&
      !e.srcAttr._1.path.contains(instance),
      vpred = (id, vd) => !vd._1.path.contains(instance))

    val removedGraph = Graph(wGraph.vertices.mapValues(_._1), wGraph.edges, null)
    val remainedGraph = Graph(woGraph.vertices.mapValues(_._1), woGraph.edges, null)

    removedGraph.vertices.count()
    removedGraph.edges.count()
    removedGraph.cache()
    remainedGraph.vertices.count()
    remainedGraph.edges.count()
    remainedGraph.cache()

    nGraph.unpersistVertices(blocking = false)
    nGraph.edges.unpersist(blocking = false)

    (removedGraph, remainedGraph)
  }

}

在前 10 次迭代中,它运行得很快,之后每次迭代都需要更多时间。我查看Spark WebUI,每个操作的实际执行时间几乎是一样的,但是随着迭代次数的增加,Scheduler Delay也会增加。在 20 次迭代之后,它会抛出 StackOverflowError。

【问题讨论】:

  • 你能把你的代码吗?似乎StackOverflowError 与RDD 血统无关。
  • StackOverflow 可能与某些非终止递归有关,而不是与 RDD 沿袭有关。是的,显示一些代码。
  • @maasg 我已附上代码,感谢您的帮助!
  • @bxshi 你能附上堆栈跟踪吗?它会加快解决问题的速度。

标签: scala apache-spark


【解决方案1】:
val g = loadEdgeFile(sc, edge_pt, n_partition)

g.edges.foreachPartition(_ => Unit)
g.vertices.foreachPartition(_ => Unit)

g.checkpoint()

g.edges.foreachPartition(_ => Unit)
g.vertices.foreachPartition(_ => Unit)
println(s"is cp: ${g.isCheckpointed}"

为了得到一个图检查点,它应该满足三个条件:

  • 该图之前没有具体化;
  • 然后你检查它;
  • 您应该同时实现顶点和边。 然后你检查图表的状态,你会得到一个真实的答案。

【讨论】:

    猜你喜欢
    • 2015-10-07
    • 2020-06-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-07
    • 2016-10-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多