【问题标题】:Consuming kafka batch for multiple partitions为多个分区消耗 kafka 批处理
【发布时间】:2021-02-01 23:29:37
【问题描述】:

我了解 Kafka 可以批量提取事件。 我正在尝试理解这种情况:

  • 我有一个主题的 4 个分区
  • 我有 1 个消费者,Kafka 将所有 4 个分区都分配给它。
  • 假设每批 Kafka 客户端从 Kafka 拉取 5 条消息。

我在这里想要了解的是,如果 1 个批次中的事件都来自同一个分区,然后循环到下一个分区批次。还是批处理本身已经包含来自不同分区的事件?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-partition


    【解决方案1】:

    我无法给你一个准确的答案,但我觉得它很有趣,可以测试一下。

    为此,我创建了一个具有四个分区的主题,并使用kafka-producer-perf-test 命令行工具在该主题中生成了一些消息。由于性能测试工具根本不会创建任何键,因此消息会以循环方式写入主题分区。

    kafka-producer-perf-test --topic test --num-records 1337 --throughput -1 --record-size 128 --producer-props key.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props value.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props bootstrap.servers=localhost:9092
    

    之后,我使用配置 max_poll_records=5 创建了一个简单的 KafkaConsumer 来匹配您的问题。消费者只需打印出所消费的每条消息的偏移量和分区:

    Integer counter = 0;
    
    // consume messages with `poll` call and print out results
    try(KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(settings)) {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            System.out.printf("Batch = %d\n", counter);
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, partition = %d\n", record.offset(), record.partition());
            }
            counter += 1;
        }
    }
    

    结果,回答您的问题是,消费者在移动到另一个分区之前尝试从一个分区获取尽可能多的数据。仅在来自分区1的所有消息都被消耗但未达到max_poll_records 5的限制的情况下,它从分区2添加了两条消息。

    这里有一些印刷品可以更好地理解。

    Batch = 0
    offset = 310, partition = 0
    offset = 311, partition = 0
    offset = 312, partition = 0
    offset = 313, partition = 0
    offset = 314, partition = 0
    
    Batch = 1
    offset = 315, partition = 0
    offset = 316, partition = 0
    offset = 317, partition = 0
    offset = 318, partition = 0
    offset = 319, partition = 0
    
    # only offsets with partition 0
    
    Batch = 45
    offset = 525, partition = 0
    offset = 526, partition = 0
    offset = 527, partition = 0
    offset = 528, partition = 0
    offset = 529, partition = 0
    Batch = 46
    offset = 728, partition = 1
    offset = 729, partition = 1
    offset = 730, partition = 1
    offset = 731, partition = 1
    offset = 732, partition = 1
    
    # only offsets with partition 1
    
    Batch = 86
    offset = 928, partition = 1
    offset = 929, partition = 1
    offset = 930, partition = 1
    offset = 931, partition = 1
    offset = 932, partition = 1
    Batch = 87
    offset = 465, partition = 2
    offset = 466, partition = 2
    offset = 933, partition = 1
    offset = 934, partition = 1
    offset = 935, partition = 1
    Batch = 88
    offset = 467, partition = 2
    offset = 468, partition = 2
    offset = 469, partition = 2
    offset = 470, partition = 2
    offset = 471, partition = 2
    
    ## and so on
    

    【讨论】:

    • 这基本上意味着没有任何东西,正如你所说,它只是“尝试”从单个分区中提取尽可能多的内容,这很高兴知道,尽管实际上我们可能不应该t 取决于我们的批次。附言很高兴知道我们有这个 pref 工具:)
    • 可能和性能有关,因为涉及到多个RTT,所以每批次取多个分区没有意义。
    猜你喜欢
    • 1970-01-01
    • 2017-08-15
    • 2018-11-21
    • 2019-12-15
    • 2016-04-18
    • 1970-01-01
    • 1970-01-01
    • 2019-06-21
    • 1970-01-01
    相关资源
    最近更新 更多