【问题标题】:Reset kafka LAG (change offset) within consumer group in Kafka-python在 Kafka-python 的消费者组中重置 kafka LAG(更改偏移量)
【发布时间】:2018-10-06 07:32:10
【问题描述】:

我在使用 kafka-consumer-groups.sh 工具 How to change start offset for topic? 重置 LAG 的地方找到了这个,但我需要在应用程序中重置它。我找到了这个例子,但它似乎没有重置它。 kafka-python read from last produced message after a consumer restart例子

    consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
                             enable_auto_commit=False,
                             group_id="MyTopic.group")
    consumer.poll()
    consumer.seek_to_end()
    consumer.commit()

    ... continue on with other code...

运行bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe 仍然显示两个分区都有滞后。如何让当前偏移量“快进”到最后?

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST             CLIENT-ID
MyTopic         0          52110           66195           14085           kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1  kafka-python-1.4.2
MyTopic         1          52297           66565           14268           kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2  kafka-python-1.4.2

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-python


    【解决方案1】:

    为了“快进”消费者组的偏移量,意味着清除 LAG,您需要创建将加入同一组的新消费者。
    控制台命令是:

    kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>
    

    您可以并行运行命令来查看您所描述的滞后,您将看到滞后已消除。

    【讨论】:

    • 我不明白添加另一个消费者最终会如何导致它重置?我希望以编程方式重置 LAG。
    【解决方案2】:

    你可能想要这个:

    def consumer_from_offset(topic, group_id, offset):
        """return the consumer from a certain offset"""
        consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
        tp = TopicPartition(topic=topic, partition=0)
        consumer.assign([tp])
        consumer.seek(tp, offset)
    
        return consumer
    
    consumer = consumer_from_offset('topic', 'group', 0)
    for msg in consumer:
        # it will consume the msg beginning from offset 0
        print(msg)
    
    

    【讨论】:

      猜你喜欢
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-22
      • 1970-01-01
      • 2019-05-01
      • 1970-01-01
      • 2018-05-03
      相关资源
      最近更新 更多