【发布时间】:2020-07-12 01:01:06
【问题描述】:
我需要使用 Kafka Streams DSL 缓存来减少下游处理器的写入量。但是,我们的应用程序处理墓碑,这引入了复杂性。例如,给定以下单个键的记录,K1:
<K1, V1>
<K1, V2>
<K1, V3>
DSL 缓存可能只发出以下的最终记录:
<K1, V3>
当然,关闭 DSL 缓存后,它会发出所有中间记录:
<K1, V1>
<K1, V2>
<K1, V3>
到目前为止,一切都按预期进行。但是,对于墓碑,原始序列变为:
<K1, V1>
<K1, V2>
<K1, V3>
<K1, NULL>
因此,根据刷新缓存的时间,我们可能永远看不到最终计数。例如
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | deleted
意味着<K1, V2> 被刷新,但绝不是<K1, V3>。我试图实现的语义涉及每当收到该键的墓碑时刷新缓存中给定键的最新记录。
<K1, V1> | cached
<K1, V2> | flushed
<K1, V3> | cached
<K1, NULL> | emit the latest record (`<K1, V3>`), then delete.
我无法使用 DSL 执行此操作,并且处理器 API 不会公开底层缓存,因此也无法在此处执行此操作。我正在考虑实现一个自定义内存缓存并将其与处理器 API 一起使用,但它变得复杂,因为如果应用程序不正常关闭(例如 SIGKILL),似乎可能会丢失数据。也不确定 DSL 缓存如何处理不正常的关闭(例如,可能存在数据丢失),所以也许我正在考虑的实现可以在 DSL 缓存之后建模。
无论如何,我是不是在想这个问题?有没有办法在收到墓碑时从 DSL 缓存中刷新最新记录,而不是实现自定义缓存?
【问题讨论】: