【问题标题】:How to do content filtering with Apache Kafka?如何使用 Apache Kafka 进行内容过滤?
【发布时间】:2026-02-21 13:25:01
【问题描述】:

我有一个名为 mytopic 的主题。该主题有一个生产者和两个消费者。我需要做的是根据生产者的前缀过滤生产者产生的消息。例如,如果一条消息以“a”前缀开头,那么只有第一个消费者必须接受它。如果它以'b'前缀开头,那么只有第二个消费者必须接受它。

我做了很多搜索,我发现过滤来自一个主题的消息,然后将它们过滤后发送到不同的主题。但如上所述,我需要对一个主题进行过滤。我如何在 Kafka 中做到这一点?

【问题讨论】:

  • 为什么不能使用两个主题?无论如何,您应该看看 Kafka Streams。
  • @BenWatson 是的,我看过了,它提供主题之间的流式传输。我会用它做另一件事。谢谢。
  • 也许你可以用 KSQL 编写你的过滤器。 github.com/confluentinc/ksql

标签: java apache-kafka


【解决方案1】:

这很简单,不需要写回不同的主题。
“2 个消费者”是指 2 个消费者组或 1 个消费组中的 2 个消费者线程?
我会说这两个。

如果是 1 个消费者组中的 2 个消费者线程,则可以使用消息 'Key' 字段。
Kafka 将相同的 'Key' 消息发送到相同的 'Partition'。
例如,一条消息前缀'a',键域'a',b消息前缀'b',键域'b',则Kafka将a消息发送到'Partition-1',b消息发送到'Partition-2 '。 消费者线程A可以订阅指定的'mytopic-Partition-1',线程B可以订阅'mytopic-Partition-2'使用类'org.apache.kafka.clients.consumer.KafkaConsumer'中的'assign'方法'。

如果是2个消费者组,只需订阅主题并在代码中过滤即可。如果不满意,请使用上述相同的方法。
技巧是向特定的“分区”发送特定的前缀消息。强>
如果你真的想要过滤器,也许你可以使用 Kafka Connect 插件。

【讨论】:

  • 这提供了有关上述答案的更多详细信息,谢谢。
  • 但是两个不同的键可能最终在同一个分区中。如果您拥有的密钥多于分区数量,我认为这不是一个安全的选择。
  • 我知道你的意思是两个不同的键具有相同的哈希值,但概率非常低,除非你有太多不同的键。
【解决方案2】:

一旦您使用 java 流 和特定于消费者的过滤逻辑获取记录 过滤 他们,就允许两个消费者使用所有数据。

简而言之,我的意思是按原样获取数据并使用 java 代码过滤它们,而不是在 Kafka 级别进行。

更新:

如果要在Kafka级别进行过滤,可以使用partitions,在向kafka topic发送消息时,向Partition发送前缀为'a'的消息-1,以及带有前缀“b”的消息到 Partition-2

现在,在消费时,只需在各个消费者中消费该特定分区

【讨论】:

  • 感谢您的回答,但我已经这样做了。我的意思是我们可以在到达消费者之前过滤去往消费者的记录吗?
  • 你有什么例子吗?我不知道该怎么做?
  • 这里的代码会很好;)