【问题标题】:How to consume message from Kafka topic using some filter?如何使用一些过滤器从 Kafka 主题中消费消息?
【发布时间】:2023-03-31 16:35:01
【问题描述】:

我正在用 Java 编写一个简单的 Kafka 生产者-消费者程序,我在其中生成了如下数据:

  • 键:“a” 值:“{25,223,465}”

  • 键:“a” 值:“{26,323,56}”

  • 键:“a” 值:“{62,256,652}”

  • 键:“a” 值:“{26,227,42}”

  • 键:“b” 值:“{4352,234,65342}”

  • 键:“b” 值:“{243,22347,434}”

我可以使用 consumer.poll(10000) 来消费消息,但现在我想消费数据,比如 Kafka 主题中有多少 a 记录和多少 b 记录。

如果我将此与 SQL 联系起来

select count(*) from 'mytopic' where key='a'

select count(*) from 'mytopic' where key='b'

如果可能,请提供java代码

【问题讨论】:

  • 听起来你可能对使用 ksqlDB 感兴趣
  • @OneCricketeer 是的,也许吧,但不知道如何在 Java 代码中使用它。你能提供一些来源吗
  • 我相信您能够在其网站上查找和搜索 ksqlDB 示例... KsqlDB 使用 sql 语法,但它具有可通过 Java 使用的 REST API。但是,正如所回答的那样,普通的 Kafka 消费者并不意味着以您要求进行聚合的方式使用 - 您需要另一个工具
  • @OneCricketeer Thx

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

Kafka 不能以这种方式工作,消费者只需从分区中(或从头开始)中最新提交的偏移量读取并按顺序读取所有消息。 您可以做的是在您的应用程序中进行过滤。 出于您的目的,您可以使用基于 Kafka Streams API 的应用程序,而不是仅使用消费者 API 编写应用程序,该应用程序为您提供 DSL 来执行诸如映射、过滤等操作......真的很容易。 更多信息在这里:

https://kafka.apache.org/documentation/streams/

【讨论】:

    猜你喜欢
    • 2019-06-23
    • 1970-01-01
    • 2018-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-27
    相关资源
    最近更新 更多