【问题标题】:Batch consumer camel kafka批量消费骆驼卡夫卡
【发布时间】:2022-02-24 07:06:13
【问题描述】:

尽管遵循此处发布的示例,但我无法与 kafka 骆驼消费者一起批量阅读。我需要对我的生产者进行更改,还是我的消费者配置最有可能出现问题?

有问题的应用程序利用kafka camel component 从休息端点摄取消息,验证它们,并将它们放在一个主题上。然后,我有一个单独的服务从主题中使用它们并将它们保存在时间序列数据库中。

一次生成和使用一条消息,但数据库希望批量使用和提交消息以获得最佳性能。在不接触生产者的情况下,我尝试调整消费者以匹配此问题答案中的示例:

How to transactionally poll Kafka from Camel?

我不确定这些消息会如何显示,所以现在我只是记录它们:

    from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
        // simple approach to generating errors
        String body = exchange.getIn().getBody(String.class);
        if (body.startsWith("error")) {
            throw new RuntimeException("can't handle the message");
        }
        log.info("BODY:{}", body);
    }).process(kafkaOffsetManager);

但消息似乎仍然是一次收到一条,没有批量阅读。

我的消费者配置是这样的:

  kafka:
    host: myhost
    port: myport
    consumer:
      seekTo: beginning
      maxPartitionFetchBytes: 55000
      maxPollRecords: 50
      consumerCount: 1
      autoOffsetReset: earliest
      autoCommitEnable: false
      allowManualCommit: true
      breakOnFirstError: true

我的配置是否需要工作,或者我需要对生产者进行更改才能使其正常工作?

【问题讨论】:

  • 如果我在该主题上排队 50 条消息,则输出为 50 条以下日志消息:BODY:{"readingTimestampEvent":1645468048841,"reading":{"utcOffset":"0" ,"data":{"temp":"25"},"profileId":"252c8574-a9df-4490-bb48-ffc11f6e535c","sensorType":"my-iot-sensor","operationId":"4f7ecee7- 0a19-41ac-8b28-b77df3223862","sensorId":"41804b07-6f6f-40fa-a506-3625cac11b8e","timestamp":1645470318721}}

标签: java apache-kafka apache-camel


【解决方案1】:

在最底层,KafkaConsumer#poll 方法将返回一个Iterator<ConsumerRecord>;没有办法。

我对 Camel 没有深入的经验,但为了获得“一批”记录,您需要一些中间集合来“排队”您最终要发送到下游的数据“收集消费者”的过程。然后你将需要一些“切换”处理器,上面写着“等待,处理这批”或“继续填充这批”。

就数据库而言,该过程正是 Kafka Connect JDBC Sink 对 batch.size 配置所做的。

【讨论】:

    猜你喜欢
    • 2019-06-15
    • 2016-06-17
    • 1970-01-01
    • 2015-11-22
    • 1970-01-01
    • 2020-12-22
    • 1970-01-01
    • 2023-03-29
    • 2021-09-07
    相关资源
    最近更新 更多