【问题标题】:External processing using Kafka Streams使用 Kafka Streams 进行外部处理
【发布时间】:2019-12-14 15:43:32
【问题描述】:

关于使用外部数据丰富消息有几个问题,建议几乎总是相同的:ingest external data using Kafka Connect and then join the records using state stores。尽管它适用于大多数情况,但还有一些其他用例不适用,例如 IP 到位置和用户代理检测等。

使用基于 IP 的位置丰富消息通常需要lookup by a range of IPs,但目前没有提供这种功能的内置状态存储。对于用户代理分析,如果您依赖第三方服务,您除了执行外部调用之外别无选择。

我们花了一些时间思考这个问题,并提出了在支持范围查询的数据库(如 Postgres)上实现自定义状态存储的想法。我们还可以在状态存储后面抽象出一个外部 HTTP 或 GRPC 服务,但我们不确定这是否是正确的方法。

从这个意义上说,当您在流处理过程中无法避免查询外部服务但仍必须保证容错时,推荐的方法是什么?当状态存储在检索数据时发生错误(例如,请求失败)时会发生什么? Kafka Streams 是否重试处理消息?

【问题讨论】:

  • “在状态存储后面抽象一个外部 HTTP 或 GRPC 服务” - 这不是交互式查询的作用吗?
  • IQ 允许您将数据从商店获取到您的应用程序中——问题是关于将外部数据获取到Processor

标签: apache-kafka apache-kafka-streams


【解决方案1】:

通常,内置商店支持KeyValueStore#range(fromKey, toKey)。因此,最好了解您尝试执行的范围查询是如何完成的?另请注意,在内部,所有内容都存储为 byte[] arrasy 并且 RocksDB(默认存储引擎)相应地对数据进行排序 - 因此,如果您开始推理字节布局,您实际上可以实现非常复杂的范围查询,并传入相应的将“前缀键”转换为#range()

如果您确实需要调用外部服务,您有“两个”选项来不丢失数据:如果外部调用失败,则抛出异常并让 Kafka Streams 死掉。这显然不是一个真正的选择,但是,如果您从外部查找中吞下错误,您将“跳过”输入消息并且它将未被处理。 Kafka Streams 无法知道处理“失败”(它不知道您的代码做了什么)并且不会“重试”,但认为消息已完成(如果您将其过滤掉,则类似)。

因此,为了使其工作,如果外部调用失败,您需要将用于触发查找的所有数据放入状态存储中,然后重试(即,查找存储以查找未处理的数据和重试)。当您处理下一条输入消息时,此重试可以是“辅助任务”,您可以安排标点符号来实现重试。请注意,此机制会更改处理记录的顺序,这对于您的用例可能合适或不合适。

【讨论】:

  • 当外部服务中断相当罕见(SLA 99.9%)时,让应用程序死掉对于无状态应用程序来说不是一个不错的选择吗?在基础设施端处理重启允许应用退避策略、服务状态检查等。
  • 我想这取决于您的应用程序要求——从 Kafka Streams 的角度来看,恕我直言,没有真正的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-01-07
  • 1970-01-01
  • 2018-08-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-28
相关资源
最近更新 更多