【发布时间】:2020-08-01 17:49:49
【问题描述】:
我想在我的 kafka-streams 应用程序中删除重复数据,该应用程序使用状态存储并使用这个非常好的示例:
我对这个例子有几个问题。
正如我正确理解的那样,这个例子简要地做到了这一点:
- 消息进入输入主题
- 查看store,如果不存在,写入state-store并返回
- 如果确实存在删除记录,则应用重复数据删除。
但在代码示例中,您可以确定一个时间窗口大小。此外,状态存储中消息的保留时间。您还可以通过给出时间戳 timeFrom + timeTo 来检查记录是否在商店中
final long eventTime = context.timestamp();
final WindowStoreIterator<String> timeIterator = store.fetch(
key,
eventTime - leftDurationMs,
eventTime + rightDurationMs
);
timeTo 和 timeFrom 的实际目的是什么?我不确定为什么要检查下一个时间间隔,因为我正在检查尚未进入我的主题的未来消息?
我的第二个问题与这个时间间隔有关吗?应该 HIT 上一个时间窗口吗?
如果我能够通过给出 timeTo 和 timeFrom 来搜索时间间隔,为什么时间窗口大小很重要?
如果我将窗口大小设置为 12 小时,我能否保证我在 12 小时内对消息进行了重复数据删除?
我是这样想的:
第一条消息在应用程序启动的第一分钟带有“A”键,11小时后,带有“A”键的消息再次出现。我可以通过提供足够的时间间隔(例如 eventTime - 12hours )来捕捉这个重复的消息吗?
感谢您的任何想法!
【问题讨论】:
标签: apache-kafka duplicates apache-kafka-streams