【问题标题】:Spark Streaming Direct Kafka API, OffsetRanges : How to handle first runSpark Streaming Direct Kafka API,OffsetRanges:如何处理首次运行
【发布时间】:2016-06-27 17:14:03
【问题描述】:

我的 spark-streaming 应用程序在没有 ZooKeeper 帮助的情况下使用直接流方法从 Kafka 读取数据。我想处理失败,例如在我的应用程序中遵循 Exactly-once Semantics。我关注this 以供参考。一切看起来都很完美,除了:

val stream: InputDStream[(String,Long)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Long)](
      ssc, kafkaParams, fromOffsets,
      // we're just going to count messages per topic, don't care about the contents, so convert each message to (topic, 1)
      (mmd: MessageAndMetadata[String, String]) => (mmd.topic, 1L))

在应用程序的第一次运行中,由于不会读取偏移量,所以要为fromOffsets Map 参数传递什么值?我当然错过了一些东西。

感谢并感谢任何帮助!

【问题讨论】:

  • 第一个偏移量是0L——你想用它作为每个TopicAndPartitionfromOffset

标签: scala apache-spark apache-kafka spark-streaming kafka-consumer-api


【解决方案1】:

第一个偏移量不一定是 0L,取决于主题存在的时间。

我个人只是单独将适当的偏移量预先插入到数据库中。然后 spark 作业在启动时从数据库中读取偏移量。

spark Kafka 集成中的文件 kafkacluster.scala 有一些方法可以更轻松地查询 Kafka 的最早可用偏移量。该文件是私有的,但已在最新的 spark 代码中公开。

【讨论】:

  • 偏移量永远不会小于 0L -- 所以如果你总是使用 0L 它总是从头开始,即使开头是 32L 或其他什么
  • @CodyKoeninger 是的,即使我决定在第一次运行时手动插入偏移量。仍在研究是否有更好的方法,因为对于每个主题的每个分区,我都必须手动执行此操作:(
  • @DavidGriffin 不,我试过 0L。它给了我 OffsetOutOfRange 异常。由于偏移量将不再可用,根据保留期,我猜它不会起作用。
  • 您始终可以使用 Offset Fetch 并在开始之前获取值。
  • @Cody 如果启用,偏移量是否不会存储在检查点目录中?
猜你喜欢
  • 2018-01-13
  • 2017-02-06
  • 1970-01-01
  • 1970-01-01
  • 2021-05-22
  • 1970-01-01
  • 1970-01-01
  • 2018-10-26
  • 2019-06-28
相关资源
最近更新 更多