【问题标题】:apache spark streaming - kafka - reading older messagesapache spark streaming - kafka - 阅读旧消息
【发布时间】:2015-02-04 02:03:25
【问题描述】:

我正在尝试使用火花流读取来自 Kafka 的旧消息。但是,我只能在实时发送消息时检索它们(即,如果我在 spark 程序运行时填充新消息 - 然后我会收到这些消息)。

我正在更改我的 groupID 和 consumerID 以确保 zookeeper 不只是不发送它知道我的程序以前见过的消息。

假设 spark 将 zookeeper 中的偏移量视为 -1,它不应该读取队列中的所有旧消息吗?我只是误解了 kafka 队列的使用方式吗?我对火花和卡夫卡很陌生,所以我不能排除我只是误解了一些东西。

package com.kibblesandbits

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import net.liftweb.json._

object KafkaStreamingTest {

  val cfg = new ConfigLoader().load
  val zookeeperHost = cfg.zookeeper.host
  val zookeeperPort = cfg.zookeeper.port
  val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot

  implicit val formats = DefaultFormats 

  def parser(json: String): String = {
    return json
}

def main(args : Array[String]) {
  val zkQuorum = "test-spark02:9092"

  val group = "myGroup99"
  val topic = Map("testtopic" -> 1)
  val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
  val ssc = new StreamingContext(sparkContext, Seconds(3))
  val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
  var gp = json_stream.map(_._2).map(parser)

  gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
  ssc.start()
}

运行此程序时,我将看到以下消息。所以我相信这不仅仅是因为设置了偏移量而没有看到消息。

14/12/05 13:34:08 信息 ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047] 为分区添加了 fetcher ArrayBuffer([[testtopic,0], initOffset -1 到代理 id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initOffset -1 到代理 id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initOffset -1 到代理 id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initOffset -1 到代理 id:1,host:test-spark02.vpc,port:9092])

然后,如果我填充 1000 条新消息——我可以看到这 1000 条消息保存在我的临时目录中。但是我不知道如何阅读现有的消息,这些消息应该在(此时)数万。

【问题讨论】:

    标签: apache-spark apache-zookeeper apache-kafka spark-streaming


    【解决方案1】:

    KafkaUtils 上使用替代工厂方法,让您向 Kafka 消费者提供配置:

    def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
          ssc: StreamingContext,
          kafkaParams: Map[String, String],
          topics: Map[String, Int],
          storageLevel: StorageLevel
        ): ReceiverInputDStream[(K, V)]
    

    然后使用您的 kafka 配置构建地图并将参数“kafka.auto.offset.reset”设置为“smallest”:

    val kafkaParams = Map[String, String](
          "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
          "zookeeper.connection.timeout.ms" -> "10000",
          "kafka.auto.offset.reset" -> "smallest"
    )
    

    将该配置提供给上面的工厂方法。 "kafka.auto.offset.reset" -> "smallest" 告诉消费者从你主题中的最小偏移量开始。

    【讨论】:

    • 仍然没有为我工作,还有其他可能的方法吗?我有 10k 条消息驻留在主题中,但只有当我在主题中收到新消息时才能检索它们。如何获取已经存储在kafka topic中的数据?
    • "auto.offset.reset" -> "smallest" 为我工作。此外,根据文档,cwiki.apache.org/confluence/display/KAFKA/FAQ 如果您使用 0.9 版本,它应该是“最早的”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-12-30
    • 2018-06-25
    • 2017-10-30
    • 1970-01-01
    • 2020-08-04
    • 1970-01-01
    • 2018-01-09
    相关资源
    最近更新 更多