【问题标题】:Spark - Get earliest and latest offset of Kafka without opening streamSpark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量
【发布时间】:2017-11-29 01:16:58
【问题描述】:

我目前正在使用spark-streaming-kafka-0-10_2.11 将我的 spark 应用程序与 kafka 队列连接起来。对于 Streams,一切正常。但是对于一个特定的场景,我只需要一次卡夫卡队列的全部内容——为此我得到了更好地使用KafkaUtils.createRDDSparkStreaming: Read Kafka Stream and provide it as RDD for further processing)的建议

但是对于spark-streaming-kafka-0-10_2.11,我无法弄清楚如何为我的 Kafka 主题获取最早和最新的偏移量,这是创建我必须使用 createRDD 方法的 Offset-Range 所需的。

在不打开流的情况下获得这些偏移量的推荐方法是什么?任何帮助将不胜感激。

【问题讨论】:

  • 如果需要从 Kafka 生成单个批次,为什么要使用 Spark Streaming?
  • @YuvalItzchakov 我的应用程序中的其他几个用例需要火花流。所以我想我也会使用它的 API 来接收单个 RDD。您建议的实现我想做的方法是什么?如果使用另一种方式连接到 Kafka 是更好的做法,我很乐意以另一种方式进行。
  • 您可以运行批处理作业,使用它的驱动程序手动连接到 Kafka,如果您希望使用分布式集群计算数据,您可以使用 @987654327 将其包装在 RDD 中@
  • 当我搜索connecting spark with kafka时,我只得到streaming-kafka的结果-@YuvalItzchakov你有什么提示在哪里寻找你建议的驱动方法吗?尽管我认为流式卡夫卡必须有某种方式,因为它经常被建议。
  • 我对这个问题的解决方案非常感兴趣。不幸的是,@rukavitsya 给出的一个答案描述了不同问题的解决方案。

标签: scala apache-spark apache-kafka


【解决方案1】:

在阅读了几个讨论之后,我能够从特定分区获得最早或最新的偏移量:

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head

但是,KafkaUtils.createRDD 方法不知道如何在 kafka_consumer.sh CLI 命令中复制“from_beginning”的行为。

【讨论】:

  • 您可以打开kafka_consumer.sh ,检查它是否只是针对Java 类进行的,然后查看Kafka 源代码:)
  • 这是真的!我已经做到了,它确实有帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-06-27
  • 2021-08-09
  • 2020-01-18
  • 2022-06-13
  • 1970-01-01
  • 2016-05-27
  • 1970-01-01
相关资源
最近更新 更多