【问题标题】:Kafka 0.9 new consumer api --- how to just watch consumer offsetsKafka 0.9 新的消费者 api --- 如何只观察消费者偏移量
【发布时间】:2016-02-12 20:59:49
【问题描述】:

我正在尝试使用 Java API 监控给定组的消费者偏移量。我创建了一个额外的消费者,它不订阅任何主题,只是调用consumer.committed(topic) 来获取偏移信息。这种工作,但是:

为了测试,我只使用一个真正的消费者(即订阅主题的消费者)。当我使用 close() 将其关闭并稍后重新启动时,订阅和第一次消费消息之间需要 27 秒,尽管我使用的是 poll(1000)

我猜这与重新平衡可能被非订阅消费者混淆有关。这可能吗?有没有更好的方法用Java API监控偏移量(我知道命令行工具,但需要使用API​​)。

【问题讨论】:

    标签: java kafka-consumer-api


    【解决方案1】:

    有不同的方法来检查主题的偏移量,这取决于你想要它的目的,除了你上面描述的“承诺”之外,这里还有两个选项:

    1) 如果你想知道消费者在下次线程启动时开始从代理获取数据的偏移id,那么你必须使用“位置”作为

    long offsetPosition;
    TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
        offsetPosition = kafkaConsumer.position(tPartition);
        System.out.println("offset of the next record to fetch is : " + position);
    

    2) 在从 kafkaConsumer 执行轮询后,从 ConsumerRecord 对象调用“offset()”方法

    Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
    while(it.hasNext()){
    ConsumerRecord<byte[],byte[]> record = it.next();
    System.out.println("offset : " + record.offset());
    }
    

    【讨论】:

    • 这两种方法只对订阅的消费者有效。关键是我有一个消费者可以监控。它不会参与消费,所以不能使用这些方法。
    【解决方案2】:

    找到它:监控消费者增加了混乱,但不是罪魁祸首。最后虽然有点出乎意料但很容易理解(至少对我来说):

    session.timeout.ms 的默认值为 30 秒。当消费者消失时,最多需要 30 秒才能宣布其死亡并重新平衡工作。为了进行测试,我停止了我拥有的单个消费者,等待三秒钟并重新启动一个新消费者。然后这需要 27 秒才开始,填补了 30 秒的超时。

    我本来希望一个单独的消费者启动不会等待超时到期,而是开始“重新平衡”,即立即获取工作。似乎超时必须在重新平衡工作之前到期,即使只有一个消费者。

    为了更快地通过测试,我更改了配置,为消费者使用较低的session.timeout.ms,为代理使用group.min.session.timeout.ms

    总结:使用不订阅任何主题的消费者来监控偏移量可以正常工作,并且似乎不会干扰重新平衡过程。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-09-09
      • 1970-01-01
      • 1970-01-01
      • 2017-07-22
      • 1970-01-01
      • 2019-04-04
      • 2016-06-02
      • 1970-01-01
      相关资源
      最近更新 更多