【问题标题】:Deduplication using Kafka-Streams使用 Kafka-Streams 进行重复数据删除
【发布时间】:2020-08-01 17:49:49
【问题描述】:

我想在我的 kafka-streams 应用程序中删除重复数据,该应用程序使用状态存储并使用这个非常好的示例:

https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java

我对这个例子有几个问题。

正如我正确理解的那样,这个例子简要地做到了这一点:

  • 消息进入输入主题
  • 查看store,如果不存在,写入state-store并返回
  • 如果确实存在删除记录,则应用重复数据删除。

但在代码示例中,您可以确定一个时间窗口大小。此外,状态存储中消息的保留时间。您还可以通过给出时间戳 timeFrom + timeTo 来检查记录是否在商店中

        final long eventTime = context.timestamp();

        final WindowStoreIterator<String> timeIterator = store.fetch(
            key,
            eventTime - leftDurationMs,
            eventTime + rightDurationMs
        );

timeTo 和 timeFrom 的实际目的是什么?我不确定为什么要检查下一个时间间隔,因为我正在检查尚未进入我的主题的未来消息?

我的第二个问题与这个时间间隔有关吗?应该 HIT 上一个时间窗口吗?

如果我能够通过给出 timeTo 和 timeFrom 来搜索时间间隔,为什么时间窗口大小很重要?

如果我将窗口大小设置为 12 小时,我能否保证我在 12 小时内对消息进行了重复数据删除?

我是这样想的:

第一条消息在应用程序启动的第一分钟带有“A”键,11小时后,带有“A”键的消息再次出现。我可以通过提供足够的时间间隔(例如 eventTime - 12hours )来捕捉这个重复的消息吗?

感谢您的任何想法!

【问题讨论】:

    标签: apache-kafka duplicates apache-kafka-streams


    【解决方案1】:

    TimeWindow 大小决定了您希望“复制”运行多长时间,永远不重复或仅在 5 分钟内运行。 Kafka 必须存储这些记录。较大的时间窗口可能会占用您服务器的大量资源。

    TimeFrom 和 TimeTo,导致您的记录(事件)可能会在 kafka 中延迟到达/处理,因此记录的事件时间是 1 分钟前,而不是现在。 Kafka 正在处理一个“旧”记录,这就是它需要处理那些不是那么旧的记录,相对于“旧”记录而言是“未来”记录。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-04-19
      • 1970-01-01
      相关资源
      最近更新 更多