【问题标题】:Kafka Streams DSL Cache - Handle TombstonesKafka Streams DSL 缓存 - 处理墓碑
【发布时间】:2020-07-12 01:01:06
【问题描述】:

我需要使用 Kafka Streams DSL 缓存来减少下游处理器的写入量。但是,我们的应用程序处理墓碑,这引入了复杂性。例如,给定以下单个键的记录,K1

<K1, V1>
<K1, V2>
<K1, V3>

DSL 缓存可能只发出以下的最终记录:

<K1, V3>

当然,关闭 DSL 缓存后,它会发出所有中间记录:

<K1, V1>
<K1, V2>
<K1, V3>

到目前为止,一切都按预期进行。但是,对于墓碑,原始序列变为:

<K1, V1>
<K1, V2>
<K1, V3>
<K1, NULL>

因此,根据刷新缓存的时间,我们可能永远看不到最终计数。例如

<K1, V1>       | cached
<K1, V2>       | flushed
<K1, V3>       | cached
<K1, NULL>     | deleted

意味着&lt;K1, V2&gt; 被刷新,但绝不是&lt;K1, V3&gt;。我试图实现的语义涉及每当收到该键的墓碑时刷新缓存中给定键的最新记录。

<K1, V1>       | cached
<K1, V2>       | flushed
<K1, V3>       | cached
<K1, NULL>     | emit the latest record (`<K1, V3>`), then delete.

我无法使用 DSL 执行此操作,并且处理器 API 不会公开底层缓存,因此也无法在此处执行此操作。我正在考虑实现一个自定义内存缓存并将其与处理器 API 一起使用,但它变得复杂,因为如果应用程序不正常关闭(例如 SIGKILL),似乎可能会丢失数据。也不确定 DSL 缓存如何处理不正常的关闭(例如,可能存在数据丢失),所以也许我正在考虑的实现可以在 DSL 缓存之后建模。

无论如何,我是不是在想这个问题?有没有办法在收到墓碑时从 DSL 缓存中刷新最新记录,而不是实现自定义缓存?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    我们可能永远看不到最后的计数

    我理解你的意思,但是,对于这种情况,“最终”记录是墓碑,所以你确实看到了最后一个。你想要的是一个特定的中间结果。 DSL 不允许如此细粒度的配置来执行此操作。

    处理器 API 不公开底层缓存

    嗯,确实如此。使用Stores.keyValueStoreBuilder(),您可以在返回的StoreBuilder 上调用withCachingEnabled()。请注意,对于这种情况,默认情况下不会向下游发出任何记录,您需要手动实现发出逻辑。即,您不知道缓存何时刷新,如果刷新,它只会刷新到本地磁盘和更改日志主题,但不会在刷新时向下游发出数据。

    您可以注册一个标点符号以定期发出数据。此外,每次处理墓碑时,都可以在对 store 执行删除操作之前从 store 发出当前存储的值。

    【讨论】:

    • 谢谢!我也是这么想的。我不确定处理 SIGKILL 的最佳方式。如果我有一个自定义的内存缓存或映射来存储我想要定期(通过标点符号)发出的记录,并且收到 SIGKILL,似乎任何等待发出的数据都会丢失。我假设 DSL 记录缓存可以更优雅地处理这个问题,因为缓存刷新与状态存储和提交间隔更紧密地同步。需要深入研究该代码,但好奇您是否有防止这种数据丢失的想法。
    • 正确。使用自己的内存缓存无法获得容错。您需要使用适当的存储(该存储将在提交偏移量之前被刷新,因此您永远不会丢失数据。顺便说一句:您也可以使用内存存储而不是持久存储(两者都由更改日志主题备份)和容错)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-03
    • 1970-01-01
    • 2015-06-14
    • 2023-03-08
    • 1970-01-01
    • 1970-01-01
    • 2017-01-07
    相关资源
    最近更新 更多