【问题标题】:Processing Unprocessed Records in Kafka on Recovery/Rebalance在恢复/重新平衡时处理 Kafka 中未处理的记录
【发布时间】:2020-01-15 12:33:01
【问题描述】:

我正在使用 Spring Kafka 与我的 Kafka 实例进行交互。假设我有一个主题,比如 2 个以上的分区。

例如,在我的基于 Spring Kafka 的应用程序崩溃(甚至重新平衡),然后重新联机并且主题中有消息等待的情况下,我目前使用的策略是最新提交的偏移量对于每个分区都存储在外部存储中,然后我查找消费者对分区的分配,然后寻找该偏移量以恢复处理。

(这是基于我在 O'Reilly 的书中读到的策略。)

是否有更好的方法来处理这种情况以实现“恰好一次”语义并且不会错过任何等待消息?或者 Spring Kafka 是否有更好/更惯用的方式来处理这种情况?

提前致谢。

【问题讨论】:

  • 你尝试过使用Kafka的幂等生产者/消费者事务设置吗?

标签: apache-kafka spring-kafka


【解决方案1】:

您是否有理由不检查 kafka 本身的偏移量?

一般来说,您对“恰好一次”处理的选择是:

  1. 以事务方式将偏移量和副作用存储在一起。这只有在您的副作用进入支持事务的系统(例如数据库)时才有可能
  2. 使用 kafka 事务。这是 1 的简化变体,只要您的副作用发生在您从中读取的同一个 kafka 集群
  3. 想出一个方案,让您能够检测和忽略 kafka 管道下游的重复项(又名幂等性)

【讨论】:

  • 我读的这本书提出了一个与我相关的特定用例的好点(尽管是偏执的):可以处理记录并将其存储到数据库中(我正在这样做) ,但应用程序可能会在该操作和在 Kafka 中提交偏移之间崩溃。我不是 100% 熟悉使用 Kafka 事务;也许我应该调查一下。回复#3:我假设您的意思是对分区分配执行“从分区开头读取”操作...考虑到我必须通过的潜在消息数量并检查数据库是否有欺骗性,这可能是慢。
  • 启动一个数据库事务。在所述事务中存储您的输出(将结果插入 some_table?)和偏移量(将 {topic}、{partition}、{offset} 插入 offsets_table),然后提交事务 - 您要么存储结果和偏移量,要么都不存储(或者您的数据库不符合 ACID)
  • 我相信这就是我在最初的问题中所描述的:将该信息存储在单独的存储中(隐含事务性;我不明确表示不好)。不过,有没有更好的方法呢?例如。 #2从你的答案?
  • 我的回答中的 #2 将是此处提供的示例代码 - confluent.io/blog/transactions-apache-kafka。更具体地说 - gist.github.com/apurvam/…。请注意,它仅在您的输出全部进入您使用的同一个 kafka 集群时才有效
  • 啊,对,但是由于消费者将记录/副作用存储在单独的存储/数据库中,所以我在原始问题中定义的策略听起来可能是最好的选择。
猜你喜欢
  • 2018-02-21
  • 2021-05-24
  • 2019-12-30
  • 1970-01-01
  • 1970-01-01
  • 2019-09-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多