【问题标题】:How to Delete Record from Kafka global state store?如何从 Kafka 全局状态存储中删除记录?
【发布时间】:2020-08-29 12:41:53
【问题描述】:

恢复期间的全局状态存储将转储来自源主题的数据(这被认为是全局存储的更改日志主题)。

为了删除一条记录,我执行如下操作

kvStore.put("key-1",null)

Kafka 如何知道记录已被删除,并且在恢复过程中它会从源主题转储记录(考虑源主题有一条 key-1 的记录)

在我的拓扑中我有

  • 输入主题 -> T1
  • 并附加了一个从 T1 读取数据并从记录中构造一个键并向下转发到主题 T2 的进程
  • 主题 T2 是全局状态存储的源主题。

例子:

  • T1 我得到了数据:{"id":'123', "name":"Mohit", "type":"insert"}
  • 构造一个键记录并转发到主题,键和值为T2 -> 键:123 和值:{"id":'123', "name":"Mohit"}

在相同的关键记录之后作为数据类型删除。 T1得到数据:{"id":'123', "name":"Mohit", "type":"insert"}

所以我像这样转发记录

this.context.forward(key, null)
key: 123 value:null

在状态存储中更新相同

我只想知道,在恢复过程中,这条记录将被删除,这意味着如果我使用密钥 123 进入商店,我将获得空值。

【问题讨论】:

  • 不是 100% 确定我是否理解这个问题。但是,全局存储是只读的,您不能直接放置/删除数据(全局处理器只能按原样从主题中获取日期来更新存储)。

标签: apache-kafka kafka-consumer-api apache-kafka-streams kafka-producer-api


【解决方案1】:

状态存储更改日志是压缩主题。要从压缩主题中删除消息,您需要执行put(key, null) 操作。具有null 值的消息称为墓碑,它最终会被主题清理器删除。

请注意,消息只会(最终)在状态存储中被删除,而不是在输入主题中。

最后,键为123 的记录应该从状态存储中完全删除。

【讨论】:

  • 我说的是全局状态存储使用源主题作为他在恢复期间的更改日志。由于数据在源主题中存在 key:123 ,因此它将在全局存储中加载数据
  • 是的,它会加载全局状态sore的数据,但由于值为“null”,它最终会从存储中删除。
  • 我认为您别无选择,只能在(压缩的)主题中放置空值。否则将不会考虑恢复。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-27
  • 2018-11-15
  • 1970-01-01
  • 2020-11-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多