【问题标题】:Kafka consume message in reverse orderKafka以相反的顺序消费消息
【发布时间】:2017-04-15 11:41:43
【问题描述】:

我使用 Kafka 0.10,我有一个主题 logs,我的 IoT 设备将它们的日志发布到,我的消息的关键是 device-id,因此同一设备的所有日志都在同一个分区中。

我有一个 api /devices/{id}/tail-logs 需要在通话时显示一个设备的最后 N 个日志。

目前我以一种非常低效的方式(但工作)实现它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。

如果我可以获取当前的最新偏移量然后向后使用消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息),那么一种更有效的方法是)

是否可以用 kafka 做到这一点?如果不是,如何解决这个问题? (我会看到一个更繁重的解决方案是将 kafka-connect 链接到弹性搜索,然后查询弹性搜索,但为此再添加 2 个组件似乎有点矫枉过正......)

【问题讨论】:

  • 如果我完全了解您的应用程序,我不能 100% 确定。您“显示最后 N 条日志”基本上意味着来自主题或分区的最后 N 条消息?您的主题“日志”有多少个分区,如果有多个分区,您如何定义“最后 N 条消息”?正如您所说,这与时间戳有什么关系,正如您所说的“从一开始直到 [您] 达到当前时间戳”?向后阅读并不是那么容易,但是有 .seek().endOffsets().offsetForTimestamp() 你可以肯定地利用 - 要给出完整的答案,我需要更好地理解这个场景
  • 确实我的问题不是很清楚,我已经编辑了,我从 1 个分区读取了最后 N 条消息
  • 但是一个分区可能有多个device-ids 的日志——所以读取最后N 条消息似乎是不够的。如果可能有帮助,请使用 Kafka 流和交互式查询使您的 API 有状态:docs.confluent.io/current/streams/…您使用什么 Kafka 版本? 0.10.0、0.10.1 还是 0.10.2 ?
  • 是的,你是对的,实际上我想说的是 I would need to filter out some message to keep only those of the device i'm looking for ,我使用的是 0.10.2 并且仍处于“概念验证”阶段,所以如果我是需要的是测试版,我可以等。

标签: apache-kafka kafka-consumer-api


【解决方案1】:

由于您使用的是 0.10.2,我建议您编写一个 Kafka Streams 应用程序。应用程序将是有状态的,并且状态将保存每个 device-id 的最后 N 条记录/日志——如果新数据写入输入主题,Kafka Streams 应用程序将只更新它的状态(无需重新读取整个主题)。

此外,该应用程序还为您提供请求(“api /devices/{id}/tail-logs”使用Interactive Queries 功能。

因此,我不会构建一个必须为每个请求重新计算答案的无状态应用程序,而是构建一个有状态应用程序,它会为所有可能的请求(即,所有device-ids) 并且只在请求进来时返回已经计算的结果。

【讨论】:

  • 谢谢,在您的 cmets 中查看交互式查询的链接(我认为也可以将其放入此答案中),它像手套一样适合此需求
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-19
  • 1970-01-01
  • 2022-06-15
  • 1970-01-01
  • 1970-01-01
  • 2019-05-11
相关资源
最近更新 更多