【发布时间】:2021-10-29 09:22:22
【问题描述】:
有谁知道如何使用 Faust 实现滑动窗口?
这个想法是在 10、30、60 和 300 秒的窗口中计算键的出现次数,但我们需要在 1 秒或每次更新的基础上进行。
我有一个狡猾的解决方法,这似乎非常低效,我有一个 300 秒的翻滚窗口,然后我使用 delta() 方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到延迟之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。
更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内获得相同的统计数据......所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入设置专门的流程。
这是我的狡猾代码:
class AlarmCount(faust.Record, serializer='json'):
event_id: int
source_id: int
counts_10: int
counts_30: int
counts_60: int
counts_300: int
@app.agent(events_topic)
async def new_event(stream):
async for value in stream:
# calculate the count statistics
counts_10=0
counts_30=0
counts_60=0
counts_300=0
event_counts_table[value.global_id] += 1
for i in range(300):
if(i<=10):
counts_10+=event_counts_table[value.source_id].delta(i)
if(i<=30):
counts_30+=event_counts_table[value.source_id].delta(i)
if(i<=60):
counts_60+=event_counts_table[value.source_id].delta(i)
if(i<=300):
counts_300+=event_counts_table[value.source_id].delta(i)
await event_counts_topic.send(
value=EventCount(
event_id=value.event_id,
source_id=value.source_id,
counts_10=counts_10,
counts_30=counts_30,
counts_60=counts_60,
counts_300=counts_300
)
)
【问题讨论】:
-
我只是想出了一个更好的选择,它肯定更稳定,但如果大小或速率变大,我不使用增量执行计数,而是存储一个在窗口中列出结果并删除窗口部分。然后我将新结果附加到列表中并从列表中删除那些过期的结果,然后将列表的长度报告给
event_counts_topic。使用 python 快速检查显示约 50 毫秒来按时间戳过滤 1M 项目列表。所以这更好。但肯定还有更好的解决方案。 -
因此,上述解决方案的效率不如预期。我通过编写一个 while 循环来更好地做到这一点,该循环删除列表中的第一个元素,直到第一个元素不在到期日期之前。但随后遇到了另一个问题,即随着列表的增加,将大文件写回磁盘的时间也增加了。最终这变得不可忽略(在几十毫秒),因此需要另一种方法。
标签: apache-kafka-streams faust ktable