【问题标题】:Kafka Stream - TimeWindows卡夫卡流 - TimeWindows
【发布时间】:2018-12-13 08:01:16
【问题描述】:

我有一个关于带有 Kafka Streams 的 TimeWindows 的问题,有些概念真的让我很困惑。

我们有一个每天有 1000 万个事件的主题,我们的日志保留时间为 6 天,因此主题总共包含 6000 万个事件。

实际上,我们只对当前事件感兴趣,其余的仅出于审核原因保留 5 天。

现在我从它创建了一个 KTable,我正在执行加载所有操作并迭代事件。正如我之前提到的,实际上我们只对当天感兴趣的事件而不是 6000 万个事件,所以我在 KTable 定义中对这些数据进行了窗口化。

.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))

现在,当我使用以下语句加载所有事件时,一切运行正常。

store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())

这一天早些时候的问题,这将加载允许 100 万个事件,但稍后会加载 1000 万个事件,所以我必须迭代超过 1000 万个事件,而我们在批处理模式下工作,我想我可以进一步优化这个并且仅加载最后一小时的事件,因此对于相同的 KTable 配置,我尝试使用以下语句。

store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())

但令我惊讶的是,这并没有返回任何数据。

有人可以解释为什么这没有返回任何结果,我想我误解了 TimeWindow 概念中的某些内容。

然后我做了一些进一步的测试,并将我的 KTable 配置更改为以下。

.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))

现在这个查询功能如我所愿

store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())

但我不确定我是否在正确的路径...

如果我将以下语句用于最新的 KTable 配置,这是否会为我带来当天的 1000 万个事件?

store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    当您在窗口存储上使用交互式查询时,时间范围将应用于窗口开始时间戳。因此,如果您有一个 1 天的窗口,并从 [now - 1 hour, now) 查询窗口开始时间戳的数据,您将找不到任何匹配的窗口,因为在此时间范围内没有窗口开始。

    【讨论】:

    • 好的,第一个回答...第二个星座会工作吗?如果我有 1 小时的窗口保留 1 天,如果我用 [现在 - 1 天,现在] 查询将提供当天的所有内容还是仅提供最后一小时?
    • 我将返回一整天的所有数据。您可以查询保留时间范围内的所有内容。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-08
    • 2018-03-06
    • 2016-08-03
    • 2018-09-15
    • 2018-03-07
    • 2019-11-13
    • 2017-09-16
    相关资源
    最近更新 更多