【问题标题】:DStream filtering and offset management in Spark Streaming KafkaSpark Streaming Kafka 中的 DStream 过滤和偏移管理
【发布时间】:2018-04-12 20:42:10
【问题描述】:

我目前正在编写一个 Spark 流应用程序,它从 Kafka 读取数据并尝试在应用一些转换之前对其进行解码。

目前的代码结构如下:

val stream = KafkaUtils.createDirectStream[String, String](...)
 .map(record => decode(record.value())
 .filter(...)
 .foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   ...
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }

失败的解码和过滤发生在DStream上,偏移量管理在foreachRDD内部完成,也就是说我只会提交成功的记录。

要提交失败的记录,我可以移动 foreachRDD 循环中的所有内容:

val stream = KafkaUtils.createDirectStream[String, String](...)
 .foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   ...
   // Decoding and filtering here
   ...
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }

但是,我想知道是否还有其他方法可以提交失败的记录。也许不提交失败的记录是可以接受的?

【问题讨论】:

    标签: scala apache-kafka spark-streaming


    【解决方案1】:

    我假设您使用的是 spark-streaming-kafka 库。

    阅读offset Ranges 的文档,它存储了主题分区的偏移量范围。它不会过滤掉或根据客户端过滤“.filter(…)”操作在该范围内标记单个偏移量。因此,如果您提交该 offsetRanges,它将提交每个分区的最高偏移量,而不管您的过滤器操作如何。

    这是有道理的,因为您的消费者正在告诉 Kafka 代理,或者更准确地说,是告诉 Group Coordinator 它使用了这些消息。协调器对您实际对数据执行的操作不感兴趣,它只是想知道该特定消费者组是否正在读取消息/偏移量。

    回到您的问题...

    我想知道是否还有其他方法可以提交失败的记录。

    虽然看起来您并不需要它,但是是的,还有另一种提交“失败”记录的方法。你可以enable auto commit。与消费者配置 auto.commit.interval.ms 一起,您可以定期提交消费者从主题轮询的偏移量。

    也许不提交失败的记录是可以接受的?

    我不了解您的特定用例,但不提交失败的记录是可以接受的。如上所述,组协调器对消费者已消费的每个分区的最高偏移量感兴趣。如果你消费一个包含 10 条消息的主题,你从头开始读取并且只提交第 9 个偏移量(偏移量从 0 开始计数),那么下次启动消费者时,它将忽略前 10 条消息。

    您可以查看 Kafka 内部主题 __consumer_offsets 以查看为每个消费者组存储的内容:主题、分区、偏移量(……等等)。

    【讨论】:

      猜你喜欢
      • 2018-09-22
      • 2017-02-06
      • 2021-05-22
      • 2020-09-03
      • 2018-07-02
      • 2021-01-15
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多