【问题标题】:Spark not able to fetch events from Amazon KinesisSpark 无法从 Amazon Kinesis 获取事件
【发布时间】:2016-02-23 02:00:59
【问题描述】:

我最近一直在尝试从 Kinesis 获取 Spark 读取事件,但在接收事件时遇到问题。虽然 Spark 能够连接到 Kinesis 并能够从 Kinesis 获取元数据,但它无法从中获取事件。它总是取回零个元素。

没有错误,只是将结果清空。 Spark 能够获取元数据(例如,kinesis 中的分片数量等)。

我已经使用这些 [1 & 2] 指南让它工作,但还没有得到太多的运气。我还尝试了来自 SO [3] 的一些建议。集群有足够的资源/核心可用。

我们发现 Spark 和 Kinesis 之间的 Protobuf 版本存在版本冲突,这也可能是导致此行为的原因。 Spark 使用 protobuf-java 2.5.0 版本,而 kinesis 可能使用 protobuf-java-2.6.1.jar。

只是想知道是否有人遇到过这种行为,或者是否有 spark 使用 kinesis。

已尝试使用 Spark 1.5.0、Spark 1.6.0。

  1. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
  2. https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

  3. Apache Spark Kinesis Sample not working

【问题讨论】:

    标签: apache-spark emr amazon-kinesis


    【解决方案1】:

    回答我自己的问题 -

    我在 Spark Kinesis 集成方面取得了一些成功,关键是 unionStreams.foreachRDD。

    有 2 个版本的 foreachRDD 可用

    • unionStreams.foreachRDD
    • unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time)

    由于某种原因,第一个无法让我得到结果,但更改为第二个可以让我得到预期的结果。尚待探究原因。

    在下面添加代码sn-p以供参考。

    还可以考虑更改此设置。这对我也有帮助-

    "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
    "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1",  // Works
    

    希望它可以帮助某人:)

    感谢大家的帮助。

    val kinesisStreams = (0 until numStreams).map {
      count =>
        val stream = KinesisUtils.createStream(
          ssc,
          consumerName,
          streamName,
          endpointUrl,
          regionName,
          InitialPositionInStream.TRIM_HORIZON,
          kinesisCheckpointInterval,
          StorageLevel.MEMORY_AND_DISK_2
        )
    
        stream
    }
    val unionStreams = ssc.union(kinesisStreams)
    
    println(s"========================")
    println(s"Num of streams: ${numStreams}")
    println(s"========================")
    
    /*unionStreams.foreachRDD{ // Doesn't Work !!
      rdd =>
        println(rdd.count)
        println("rdd isempty:" + rdd.isEmpty)
    }*/ 
    unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
      println(rdd.count)
      println("rdd isempty:" + rdd.isEmpty)
      }
    )
    
    ssc.start()
    ssc.awaitTermination()
    

    【讨论】:

      猜你喜欢
      • 2015-08-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多