【问题标题】:Spark Streaming, kafka: java.lang.StackOverflowErrorSpark Streaming,kafka:java.lang.StackOverflowError
【发布时间】:2016-06-13 09:56:15
【问题描述】:

我在 spark-streaming 应用程序中遇到错误,我正在使用 kafka 作为输入流。当我使用套接字时,它工作正常。但是当我更改为 kafka 时,它会出错。任何人都知道它为什么会抛出错误,我需要更改我的批处理时间并检查指向时间吗?

ERROR StreamingContext:启动上下文时出错,将其标记为已停止 java.lang.StackOverflowError

我的程序:

def main(args: Array[String]): Unit = {

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setAppName("HBaseStream")
      val sc = new SparkContext(conf)
      // create a StreamingContext, the main entry point for all streaming functionality
      val ssc = new StreamingContext(sc, Seconds(5))
      val brokers = args(0)
      val topics= args(1)
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
      val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet)

      val inputStream = messages.map(_._2)
//    val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
      ssc.checkpoint(checkpointDirectory)
      inputStream.print(1)
      val parsedStream = inputStream
        .map(line => {
          val splitLines = line.split(",")
          (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
        })
      import breeze.linalg.{DenseVector => BDV}
      import scala.util.Try

      val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
          prev.map(_ +: current).orElse(Some(current))
            .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
        })

      state.checkpoint(Duration(10000))
      state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
      ssc
    }
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  }
}

【问题讨论】:

    标签: scala apache-kafka spark-streaming spark-streaming-kafka


    【解决方案1】:

    尝试删除检查点目录

    我不确定,但您的流式传输上下文似乎无法从检查点恢复。

    无论如何,它对我有用。

    【讨论】:

      猜你喜欢
      • 2020-03-19
      • 1970-01-01
      • 2017-12-30
      • 2015-12-18
      • 2018-09-10
      • 1970-01-01
      • 2019-08-08
      • 2016-10-09
      • 2016-03-12
      相关资源
      最近更新 更多