【问题标题】:Flink window state size and state managementFlink 窗口状态大小和状态管理
【发布时间】:2019-03-19 18:22:02
【问题描述】:

在阅读了 flink 的文档并四处搜索之后,我无法完全理解 flink 是如何在其窗口中处理状态的。 假设我有一个每小时翻滚的窗口,它带有一个聚合函数,可以将 msgs 累积到一些 java pojo 或 scala 案例类中。 该窗口的大小将与在一小时内进入该窗口的事件数量相关联,还是仅与 pojo/case 类相关联,因为我将事件累积到该对象中。 (例如,如果将 10000 msgs 计算为整数,大小会接近 10000 * msg size 还是 int 的大小?) 此外,如果我使用 pojos 或案例类,flink 是否为我处理状态(如果内存耗尽/在检查点保存状态等溢出到磁盘)还是我必须为此使用 flink 的状态对象?

感谢您的帮助!

【问题讨论】:

    标签: apache-flink stream-processing


    【解决方案1】:

    窗口的状态大小取决于您应用的函数类型。如果您应用ReduceFunctionAggregateFunction,则会立即聚合到达的数据,并且窗口仅保存聚合值。如果您应用ProcessWindowFunctionWindowFunction,Flink 会收集所有输入记录并在时间(事件或处理时间取决于窗口类型)超过窗口结束时间时应用该函数。

    您还可以组合这两种类型的函数,即,有一个AggregateFunction,后跟一个ProcessWindowFunction。在这种情况下,到达的记录会立即聚合,当窗口关闭时,聚合结果将作为单个值传递给ProcessWindowFunction。这很有用,因为您有增量聚合(由于 ReduceFunction / AggregateFunction)而且还可以访问窗口元数据,如开始和结束时间戳(由于 ProcessWindowFunction)。

    如何管理状态取决于所选的状态后端。如果您配置FsStateBackend,所有本地状态都保留在TaskManager 的堆上,如果状态变得太大,JVM 进程将被OutOfMemoryError 杀死。如果您配置RocksDBStateBackend 状态将溢出到磁盘。这会带来每次状态访问的反序列化成本,但会为状态提供更多存储空间。

    【讨论】:

    • 非常感谢您的帮助!所以,即使我选择文件系统后端,我也会因为状态太大而变得 oom?我认为这只发生在堆状态后端。
    • 是的,FsStateBackend 可能会发生 OOM。见这里Flink official doc.@yaarix
    • FsStateBackend 的名称使 IMO 感到困惑。 Fs(文件系统)部分是指存储检查点的位置,而不是函数使用的本地状态的存储方式。对于RocksDBStateBackend,情况正好相反。 RocksDB 是本地状态存储,检查点也存储在(分布式)文件系统中。
    • @FabianHueske 这是一个非常有用的见解。只是为了确认一下,所以如果我使用的是 SQL/Table API,那么我可以安全地假设窗口函数都是 aggregateFunction?与 sql (group by) 查询一样,我看不到我们可以应用您上面提到的 ProcessingWindowFunction
    猜你喜欢
    • 1970-01-01
    • 2021-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多