【问题标题】:Kafka + Spring Batch Listener Flush BatchKafka + Spring Batch Listener Flush Batch
【发布时间】:2018-10-26 12:09:08
【问题描述】:

使用 Kafka 代理:1.0.1 spring-kafka: 2.1.6.RELEASE

我正在使用具有以下设置的批处理消费者:

// Other settings are not shown..
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

我使用spring监听器的方式如下:

 @KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}")
    public void receive(final List<String> data,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) Set<String> topics,
                        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) { // ......code... }

我总是发现一些消息保留在批处理中,而我的听众没有收到。似乎如果剩余的消息小于批量大小,则不会消耗它(可能在内存中并发布到我的侦听器)。有什么方法可以设置在一段时间后自动刷新批处理,以避免消息不被刷新? 处理批量消费者这种情况的最佳方法是什么?

【问题讨论】:

  • &gt;I always find the a few messages remain in the batch 不清楚你的意思。打开 DEBUG 日志记录以查看消费者活动。如果仍然没有达到您的预期,请发布日志并准确解释您的意思。
  • 好的 - 例如,如果我的总数据大小是 230,我的批处理大小是 100。我只看到 200 条消息被消耗(或者更确切地说是进入我的侦听器)。剩余的 30 个不被消耗。我的问题是,它是在等待批处理填满(还有 70 条消息)然后刷新吗?或者有什么方法可以在“n”秒后自动刷新批次?在这种情况下,应用程序应该如何使用批处理消费者?
  • 在消费者端,如果消息已经在主题中,只要你没有增加fetch.max.wait.msfetch.min.bytes,你应该得到剩下的30条。在生产者方面,只要您没有增加linger.ms,消息就会立即消失。
  • 我没有在我的 application.properties 文件中添加(或覆盖)这些属性,并且您提到的所有属性都没有在 DefaultKafkaConsumerFactory 中设置。所以他们只使用默认值。但还是不行。此外,调试日志会在一段时间后显示它正在轮询并获取 0 条记录(并且这会一遍又一遍地重复)。我在与消息批处理相关的日志记录中找不到任何明显的内容。

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

我刚刚进行了一次测试,没有任何问题......

@SpringBootApplication
public class So50370851Application {

    public static void main(String[] args) {
        SpringApplication.run(So50370851Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            for (int i = 0; i < 230; i++) {
                template.send("so50370851", "foo" + i);
            }
        };
    }

    @KafkaListener(id = "foo", topics = "so50370851")
    public void listen(List<String> in) {
        System.out.println(in.size());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50370851", 1, (short) 1);
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.type=batch

100
100
30

此外,调试日志会在一段时间后显示它正在轮询并获取 0 条记录(并且会一遍又一遍地重复)。

这意味着问题出在发送方。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-20
    • 2022-10-24
    • 2023-01-13
    • 2017-07-23
    • 1970-01-01
    • 2019-11-03
    相关资源
    最近更新 更多