【问题标题】:Kafka partition assignment protocolKafka 分区分配协议
【发布时间】:2018-06-21 19:07:58
【问题描述】:

我正在编写集成测试,以使用 confluent-dotnet(包装 librdkafka)验证 kafka 生产者消费者配置。

在一个测试中,我想启动一个消费者,他将从现有分区的末尾开始,然后从生产者发布一条消息并断言消费者只消费了一条消息。

现在消费者的启动是异步的(即:如果你调用订阅然后直接发布,最后开始的消费者不会收到它)。 在没有竞争条件的情况下编写此测试的适当方法是什么?一旦我完成了“Partition.Assign”,消费者偏移量是否已经确定?我不确定,因为 OnPartitionAssigned 的回调只包含一个 TopicPartition 而没有偏移量。

在一个相关的问题上,似乎有时,在没有(afaict)任何 kafka 节点故障的情况下,我收到的分配的分区多于分区的数量(即:我被分配了两次相同的分区),这是怎么回事可能吗?

【问题讨论】:

    标签: c# apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    设置一个 OnPartitionEof 委托,它将在消费者到达分区末尾时被调用,当它被调用时,您可以确定消费者确实在为给定分区获取消息并且您可以开始对其进行生产。

            consumer.OnPartitionEOF += (_, end)
                => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
    

    【讨论】:

    • 谢谢,如果使用 auto.reset 属性,这可能是一个好方法。目前我正在 OnPartitionAssigned 中做某事,由于来自“QueryWatermarkOffsets”和/或“QueryCommittedOffsets”的信息,我明确地从给定的偏移量开始消费。
    猜你喜欢
    • 1970-01-01
    • 2018-10-08
    • 1970-01-01
    • 1970-01-01
    • 2020-09-09
    • 1970-01-01
    • 1970-01-01
    • 2019-08-14
    • 1970-01-01
    相关资源
    最近更新 更多