【问题标题】:Golang Kafka not consuming all messages offsetnewestGolang Kafka没有消耗所有消息offsetnewest
【发布时间】:2018-10-15 11:08:55
【问题描述】:

第一批:- 我正在尝试从 100 个平面文件中提取数据并加载到一个数组中,并将它们作为字节数组一一插入到 kafka 生产者中。

第二批:- 我正在消费 kafka 消费者,然后将它们插入 NoSQL 数据库。

我在 Kafka 的 shopify sarama golang 包的配置文件中使用 Offsetnewset。

我可以接收并向 kafka 插入消息,但在消费时我只收到第一条消息。因为我在 sarama 配置中给出了最新的偏移量。 我怎样才能在这里获取所有数据。

【问题讨论】:

    标签: go apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    如果没有任何代码或更深入地解释 kafka 的配置方式(即:主题、分区等),很难说出一些事情,所以我想到的快速检查很少:

    1. 假设您开始使用 OffsetNewest 集合在开始生产之前,可能发生的一件事是您没有从该主题的所有分区中消费,关于 sarama 文档,您必须消费通过创建 PartitionConsumers 显式地创建每个分区。来自https://godoc.org/github.com/Shopify/sarama#Consumer中的示例:

      partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
      if err != nil {
          panic(err)
      }
      
      ...
      
      consumed := 0
      ConsumerLoop:
      for {
          select {
          case msg := <-partitionConsumer.Messages():
              log.Printf("Consumed message offset %d\n", msg.Offset)
              consumed++
          case <-signals:
              break ConsumerLoop
          }
      }
      
    2. 事实上,您是在产生所有事件之后开始消费的,因此,读取它们的指针不是 OffsetNewest,而是 OffsetOldest。

      李>

    很抱歉无法为您提供更有用的答案,但也许如果您粘贴一些代码或提供更多详细信息,我们可以提供更多帮助。

    【讨论】:

      猜你喜欢
      • 2023-01-07
      • 2018-04-11
      • 1970-01-01
      • 1970-01-01
      • 2023-02-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-16
      相关资源
      最近更新 更多