【发布时间】:2022-02-24 07:06:13
【问题描述】:
尽管遵循此处发布的示例,但我无法与 kafka 骆驼消费者一起批量阅读。我需要对我的生产者进行更改,还是我的消费者配置最有可能出现问题?
有问题的应用程序利用kafka camel component 从休息端点摄取消息,验证它们,并将它们放在一个主题上。然后,我有一个单独的服务从主题中使用它们并将它们保存在时间序列数据库中。
一次生成和使用一条消息,但数据库希望批量使用和提交消息以获得最佳性能。在不接触生产者的情况下,我尝试调整消费者以匹配此问题答案中的示例:
How to transactionally poll Kafka from Camel?
我不确定这些消息会如何显示,所以现在我只是记录它们:
from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
// simple approach to generating errors
String body = exchange.getIn().getBody(String.class);
if (body.startsWith("error")) {
throw new RuntimeException("can't handle the message");
}
log.info("BODY:{}", body);
}).process(kafkaOffsetManager);
但消息似乎仍然是一次收到一条,没有批量阅读。
我的消费者配置是这样的:
kafka:
host: myhost
port: myport
consumer:
seekTo: beginning
maxPartitionFetchBytes: 55000
maxPollRecords: 50
consumerCount: 1
autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
我的配置是否需要工作,或者我需要对生产者进行更改才能使其正常工作?
【问题讨论】:
-
如果我在该主题上排队 50 条消息,则输出为 50 条以下日志消息:BODY:{"readingTimestampEvent":1645468048841,"reading":{"utcOffset":"0" ,"data":{"temp":"25"},"profileId":"252c8574-a9df-4490-bb48-ffc11f6e535c","sensorType":"my-iot-sensor","operationId":"4f7ecee7- 0a19-41ac-8b28-b77df3223862","sensorId":"41804b07-6f6f-40fa-a506-3625cac11b8e","timestamp":1645470318721}}
标签: java apache-kafka apache-camel