【问题标题】:Sliding window using Faust使用 Faust 的滑动窗口
【发布时间】:2021-10-29 09:22:22
【问题描述】:

有谁知道如何使用 Faust 实现滑动窗口?

这个想法是在 10、30、60 和 300 秒的窗口中计算键的出现次数,但我们需要在 1 秒或每次更新的基础上进行。

我有一个狡猾的解决方法,这似乎非常低效,我有一个 300 秒的翻滚窗口,然后我使用 delta() 方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到延迟之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。

更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内获得相同的统计数据......所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入设置专门的流程。

这是我的狡猾代码:

class AlarmCount(faust.Record, serializer='json'):
  event_id: int
  source_id: int
  counts_10: int
  counts_30: int
  counts_60: int
  counts_300: int

@app.agent(events_topic)
async def new_event(stream):
  async for value in stream:
    # calculate the count statistics
    counts_10=0
    counts_30=0
    counts_60=0
    counts_300=0
    
    event_counts_table[value.global_id] += 1
    
    for i in range(300):
      if(i<=10):
        counts_10+=event_counts_table[value.source_id].delta(i)
      if(i<=30):
        counts_30+=event_counts_table[value.source_id].delta(i)
      if(i<=60):
        counts_60+=event_counts_table[value.source_id].delta(i)
      if(i<=300):
        counts_300+=event_counts_table[value.source_id].delta(i)
    
    await event_counts_topic.send(
      value=EventCount(
        event_id=value.event_id,
        source_id=value.source_id,
        counts_10=counts_10,
        counts_30=counts_30,
        counts_60=counts_60,
        counts_300=counts_300
      )
    )

【问题讨论】:

  • 我只是想出了一个更好的选择,它肯定更稳定,但如果大小或速率变大,我不使用增量执行计数,而是存储一个在窗口中列出结果并删除窗口部分。然后我将新结果附加到列表中并从列表中删除那些过期的结果,然后将列表的长度报告给event_counts_topic。使用 python 快速检查显示约 50 毫秒来按时间戳过滤 1M 项目列表。所以这更好。但肯定还有更好的解决方案。
  • 因此,上述解决方案的效率不如预期。我通过编写一个 while 循环来更好地做到这一点,该循环删除列表中的第一个元素,直到第一个元素不在到期日期之前。但随后遇到了另一个问题,即随着列表的增加,将大文件写回磁盘的时间也增加了。最终这变得不可忽略(在几十毫秒),因此需要另一种方法。

标签: apache-kafka-streams faust ktable


【解决方案1】:

我想在所有窗口上进行迭代,以将最后一个值与所有其他过去值的平均值/偏差/其他聚合进行比较。

  • 类似table[key].iter_windows()
  • 而不是循环遍历所有.delta(i)

和你一样,我将实现一个带有时间戳列表的表格。如果列表太大,它将不是最理想的,因为changelog 会很胖。我们应该只流式传输已修改的内容,而不是重复每个事件的所有列表。

因此,我将创建一个包含详细信息的短期列表和一个包含聚合的长期列表。然后,每个事件只会更新短期列表。

【讨论】:

  • 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-09-09
  • 2015-06-03
  • 1970-01-01
  • 2012-03-05
  • 2012-08-08
  • 2012-10-20
相关资源
最近更新 更多