【问题标题】:Implement filering for kafka messages为 kafka 消息实现归档
【发布时间】:2017-03-15 14:03:43
【问题描述】:

我最近开始使用 Kafka,并针对一些用例评估了 Kafka。

如果我们想提供根据消息内容为消费者(订阅者)过滤消息的功能,那么最好的方法是什么?

假设生产者公开了一个名为“交易”的主题,该主题具有不同的交易详细信息,例如市场名称、创建日期、价格等。

一些消费者对特定市场的交易感兴趣,而另一些消费者对特定日期之后的交易感兴趣等(基于内容的过滤)

由于无法在代理端进行过滤,因此实现以下情况的最佳方法是:

  1. 如果过滤条件特定于消费者。我们应该使用 消费者拦截器(尽管建议使用拦截器进行日志记录 根据文档的用途)?
  2. 如果过滤标准(基于内容的过滤)在消费者中很常见,应该采用什么方法?

监听主题并在本地过滤消息并写入新主题(使用拦截器或流)

【问题讨论】:

  • 到目前为止你做了什么?
  • 不确定您的目标是什么。如果这是目标,您不能在代理端进行任何过滤。或者你的意思是消费一个主题,过滤它,然后写回一个新主题?你能详细说明你的问题吗?
  • @MatthiasJ.Sax,我已经更新了帖子并提供了示例。
  • @MahendraGunawardena,编辑帖子了解更多详情

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

如果我正确理解您的问题,您有一个主题和不同的消费者,他们对该主题的特定部分感兴趣。同时,您不拥有这些消费者,想要避免那些消费者只是阅读整个主题并自己进行过滤?

为此,构建一个新应用程序的唯一方法是读取整个主题,进行过滤(或实际拆分)并将数据写回两个(多个)不同的主题。外部消费者将从这些新主题中消费,并且只收到他们感兴趣的日期。

为此目的使用 Kafka Streams 将是一个非常好的方法。 DSL 应该提供您需要的一切。

作为替代方案,您可以使用KafkaConsumerKafkaProducer 编写自己的应用程序,在您的用户代码中手动进行过滤/拆分。这与使用 Kafka Streams 没有太大区别,因为 Kafka Streams 应用程序会在内部执行完全相同的操作。但是,使用 Streams,您完成它的工作量会少很多。

我不会为此使用拦截器。即使这可行,它似乎也不是您用例的好软件设计。

【讨论】:

  • 谢谢。您的理解是正确的。有些消费者属于我们的应用程序,有些则不是。那么进行过滤的唯一方法是消费数据并将填充数据写入新主题?
  • 是的。如果您不拥有某些消费者,您只能通过将他们感兴趣的部分写入新主题来阻止他们阅读所有内容。
【解决方案2】:

在为消费者设置“interceptor.classes”配置之前,创建您自己的实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 的拦截器类并在方法“onConsume”中实现您的逻辑。

【讨论】:

  • ,因为拦截器主要是为记录和监控目的而设计的。在拦截器上实现过滤将意味着有效地实现过滤 inc 消费者代码。
  • 所以你想在服务器端写过滤逻辑?
  • 是的,没有代理过滤可用。如果我们再次在主题上发布过滤后的数据,我们如何实现它(帖子中的第 1 点和第 2 点)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-10-01
  • 2013-02-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-02
相关资源
最近更新 更多