【发布时间】:2018-12-13 08:01:16
【问题描述】:
我有一个关于带有 Kafka Streams 的 TimeWindows 的问题,有些概念真的让我很困惑。
我们有一个每天有 1000 万个事件的主题,我们的日志保留时间为 6 天,因此主题总共包含 6000 万个事件。
实际上,我们只对当前事件感兴趣,其余的仅出于审核原因保留 5 天。
现在我从它创建了一个 KTable,我正在执行加载所有操作并迭代事件。正如我之前提到的,实际上我们只对当天感兴趣的事件而不是 6000 万个事件,所以我在 KTable 定义中对这些数据进行了窗口化。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我使用以下语句加载所有事件时,一切运行正常。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
这一天早些时候的问题,这将加载允许 100 万个事件,但稍后会加载 1000 万个事件,所以我必须迭代超过 1000 万个事件,而我们在批处理模式下工作,我想我可以进一步优化这个并且仅加载最后一小时的事件,因此对于相同的 KTable 配置,我尝试使用以下语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,这并没有返回任何数据。
有人可以解释为什么这没有返回任何结果,我想我误解了 TimeWindow 概念中的某些内容。
然后我做了一些进一步的测试,并将我的 KTable 配置更改为以下。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
现在这个查询功能如我所愿
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我是否在正确的路径...
如果我将以下语句用于最新的 KTable 配置,这是否会为我带来当天的 1000 万个事件?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
【问题讨论】:
标签: apache-kafka apache-kafka-streams