【问题标题】:kafka consumer batch listener polling records based on sized of the messagerkafka consumer batch listener 根据messenger大小轮询记录
【发布时间】:2023-01-13 09:44:11
【问题描述】:
我希望消费者根据消息的长度/大小批量消费消息批处理意味着 1kb 的批处理和最大 poll.record 为 100。最多轮询 100 条消息或 1kb 的消息。
max.poll.records =100
如何在批处理中设置最大 1kb 的消息?
我试过 max.partition.fetch.bytes=1024 但我收到 100 条消息,即使消息批次大于 1024 字节。如何控制这种行为?
如何设置最多 1kb 的消息,每批最多 100 条消息?
【问题讨论】:
标签:
java
spring-boot
kafka-consumer-api
spring-kafka
【解决方案1】:
Consumer中的配置,比如max.partition.fetch.bytes或fetch.max.bytes都是不是绝对最大值.
记录是消费者分批取的,如果取的第一个非空分区中的第一个记录批大于这个值,仍然会返回记录批,保证消费者能取得进展
kafka 推荐通过message.max.bytes(代理配置)或max.message.bytes(主题配置)来定义。
您可以在这里找到有关 kafka 配置的所有详细信息:max.partition.fetch.bytes
希望有所帮助。
【解决方案2】:
您可以在创建消费者时通过设置 max.partition.fetch.bytes 配置属性来设置每批的最大字节数。但是,此属性控制消费者将在单个请求中从单个分区获取的最大字节数,并且不保证消费者将以特定大小的批次接收消息。
要根据消息批次的长度/大小和每批最多 100 条消息来实现批量消费消息的所需行为,您可以创建一个自定义消费者来跟踪消息的数量和消息批次的大小并停止一旦达到其中一个限制就获取消息。
是这样的:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
long maxBatchSizeBytes = 1024;
int maxBatchSizeRecords = 100;
long currentBatchSizeBytes = 0;
int currentBatchSizeRecords = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
currentBatchSizeBytes += record.serializedValueSize();
currentBatchSizeRecords++;
if (currentBatchSizeBytes > maxBatchSizeBytes || currentBatchSizeRecords > maxBatchSizeRecords) {
consumer.commitSync();
currentBatchSizeBytes = 0;
currentBatchSizeRecords = 0;
break;
}
// process the message
}
}
请注意,上面的代码 sn-p 是一个示例,它没有任何错误处理机制,您应该根据您的要求添加适当的错误处理机制。