【问题标题】:How to stop high load from leading to cascading Flink checkpoint failures如何阻止高负载导致级联 Flink 检查点故障
【发布时间】:2020-03-08 19:42:03
【问题描述】:

我会主动提出几点:

  1. 我是 Flink 新手(使用它大约一个月了)
  2. 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。无论如何,这并没有真正限制 Flink 的多功能性或容错选项,但我还是会说出来。

我们有一个相当简单的滑动窗口应用程序。键控流通过特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒,我们计算窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不占用内存。

有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情开始变得棘手。此时,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但根据这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。

令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。也许 10 或 12 个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有新的事件被处理等等。

此时我已经尝试了一些方法:

  1. 在问题上投入更多金属(自动缩放也已打开)
  2. 大惊小怪 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. 重构以减少我们存储的状态的足迹

(1) 并没有真正做太多。 (2) 这似乎有所帮助,但随后又一次比我们之前看到的更大的流量峰值压制了任何好处 (3) 目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用与你想象的 Yelp 或 Airbnb 相比相当小,他们都使用 Flink 集群来处理大型应用程序,所以我无法想象我的状态真的有问题。

我会说我希望我们不必大幅改变应用程序输出的期望。这个滑动窗口是一个非常有价值的数据。

编辑:有人问我的状态是什么样的 ValueState[FooState]

case class FooState(
                         entityType: String,
                         entityID: String,
                         events: List[BarStateEvent],
                         tableName: String,
                         baseFeatureName: String,
                       )

case class BarStateEvent(target: Double, eventID: String, timestamp: Long)

编辑: 我想强调一下用户 David Anderson 在 cmets 中所说的话:

有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。

这是必不可少的。对于其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储在某个时间片中。我的最终解决方案是将事件分成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载期,检查点保持在 3mb 并且它们总是在一秒钟内完成。

【问题讨论】:

  • 您使用的是哪个州后端?您是否尝试过增量检查点(需要使用 RocksDB 状态后端)?
  • 当前使用 RocksDB 和增量检查点
  • “我的最终解决方案是将事件分成 30 秒的批次”你有一个小例子来说明代码中的样子吗?我们还尝试对 24 小时滑动窗口进行一些计数,并且遇到了同样的问题,即每个检查点的检查点大小爆炸到 19GB(我们使用 SQL 和 GROUP BY 来计算)

标签: apache-flink amazon-kinesis amazon-kinesis-analytics


【解决方案1】:

如果您有一个 24 小时长的滑动窗口,并且它滑动 30 秒,那么每次登录都会分配给 2880 个单独的窗口中的每一个。没错,Flink 的滑动窗口是复制的。在本例中为 24 * 60 * 2 份。

如果您只是计算登录事件,则无需实际缓冲登录事件,直到窗口关闭。您可以改为使用ReduceFunction 来执行incremental aggregation

我的猜测是您没有利用此优化,因此当您有一个热键(IP 地址)时,处理该热键的实例具有不成比例的数据量,并且需要很长时间才能检查点。

另一方面,如果您已经在进行增量聚合,并且检查点与您描述的一样有问题,那么值得更深入地研究以尝试了解原因。

一种可能的补救方法是使用ProcessFunction 实现您自己的滑动窗口。这样做可以避免维护 2880 个单独的窗口,并使用更高效的数据结构。

编辑(基于更新的问题):

我认为问题在于:使用 RocksDB 状态后端时,状态以序列化字节的形式存在。每个状态访问和更新都必须经过 ser/de。这意味着您的 List[BarStateEvent] 正在被反序列化,然后在您每次修改它时重新序列化。对于列表中包含 200k 事件的 IP 地址,这将是非常昂贵的。

您应该改为使用ListStateMapState。这些状态类型针对 RocksDB 进行了优化。 RocksDB 状态后端可以附加到 ListState 而无需反序列化列表。并且使用MapState,映射中的每个键/值对都是一个单独的 RocksDB 对象,可以进行高效的查找和修改。

有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。在Flink docs 中有一个类似的例子(但有翻滚的窗口)。

或者,如果您的状态可以放入内存,您可以使用 FsStateBackend。那么你的所有状态都将是 JVM 堆上的对象,而 ser/de 只会在检查点和恢复期间发挥作用。

【讨论】:

  • “一种可能的补救方法是使用 ProcessFunction 实现您自己的滑动窗口。这样做可以避免维护 2880 个单独的窗口,并使用更有效的数据结构。”是的!这正是我正在做的。我将事件存储在 state 中,然后每 30 秒(使用 ProcessFunction onTimer 回调)发出每个键的新计数,当它自上次计数以来发生变化时。
  • 您所在的州是如何组织的?是某种集合类型的 ValueState 吗?
  • 更新了问题以包含它。它是一个 FooState 类型的对象,带有一些元数据(字符串)和一个“BarStateEvent”类型的现有事件列表,我在上面概述了这两者。
  • 谢谢!我要试试这个,过几天回来报告进度。
  • 我已经编辑了我的答案以包含一个指向 ci.apache.org/projects/flink/flink-docs-stable/learn-flink/… 的链接,这是一个类似的例子,但适用于翻滚的窗口。
猜你喜欢
  • 1970-01-01
  • 2021-09-11
  • 1970-01-01
  • 1970-01-01
  • 2011-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多