【问题标题】:kafka-python read from last produced message after a consumer restartkafka-python 在消费者重启后从最后产生的消息中读取
【发布时间】:2017-08-31 10:18:14
【问题描述】:

我正在使用kafka-python 来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我使用KafkaConsumer 类型。 如果消费者停止并在一段时间后重新启动,我想从最新产生的消息重新启动,即删除在消费者关闭期间产生的所有消息。 我怎样才能做到这一点?

谢谢

【问题讨论】:

    标签: python apache-kafka kafka-python


    【解决方案1】:

    你不会以seekToEnd()到日志结尾。

    请记住,您首先需要订阅一个主题,然后才能搜索。此外,订阅是懒惰的。因此,您也需要先添加一个“虚拟投票”,然后才能进行搜索。

    consumer.subscribe(...)
    consumer.poll() // dummy poll
    consumer.seekToEnd()
    
    // now enter your regular poll-loop
    

    【讨论】:

      【解决方案2】:

      谢谢,

      有效!

      这是我的代码的简化版本:

      consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
      #dummy poll
      consumer.poll()
      #go to end of the stream
      consumer.seek_to_end()
      #start iterate
      for message in consumer:
          print(message)
      
      consumer.close()
      

      The documentation 声明 poll() 方法与迭代器接口不兼容,我猜这是我在脚本末尾的循环中使用的那个。但是从最初的测试来看,这段代码看起来可以正常工作。

      使用它安全吗?还是我误解了文档?

      谢谢

      【讨论】:

        【解决方案3】:

        在回答中回答您的问题:

        据我了解,当您执行consumer.poll() 时,会返回一个字典。因此,当我想查询信息时,我使用循环遍历字典。

        consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
        messages = consumer.poll()
        data = []
        for msg in messages:
            for value in messages[msg]:
               #Add just the values to the list
               data.append(value[6])
        

        我相信您正在做的是使用 consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 获取迭代器,然后使用

        遍历迭代器
        #start iterate
        for message in consumer:
            print(message)
        

        看起来您实际上并不仅仅从民意调查中获得了 500 个结果。您可以通过将 max_poll_records=5 添加到您的 KafkaConsumer 配置来确认这一点。然后,当您运行代码时,如果打印出超过 5 条消息,您就可以知道您没有使用投票功能。

        希望有帮助!

        【讨论】:

          【解决方案4】:

          这是一种将投票返回的所有消息放在一个列表中的便捷方法:

          while True:
            messages = [] # Store all messages
            crs = [] # Store all consumer records
            tpd = consumer.poll(timeout_ms=60000, max_records=1)
            [ crs.extend(tp) for tp in tpd.values() ] # List of cr's
            [ messages.extend([json.loads(cr.value)]) for cr in crs ]
            print messages
          

          【讨论】:

          • 请注意这里的消息是 json 但可以跳过加载。
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2020-11-02
          • 2023-01-21
          • 2018-12-01
          • 2018-01-04
          • 2019-07-01
          • 1970-01-01
          • 2017-11-09
          相关资源
          最近更新 更多