【问题标题】:Flink checkpoint fails randomlyFlink 检查点随机失败
【发布时间】:2021-09-13 22:38:11
【问题描述】:

我正在 kubernetes 中运行一项 Flink 作业。我的设置如下

  1. 1 个作业管理器窗格
  2. 4 个任务管理器 Pod
  3. 3500 GB SSD 硬盘通过 NFS 持久卷声明在作业管理器和任务管理器 pod 之间共享,用于检查点、保存点和 RocksDB 状态后端
  4. 使用 state.backend.rocksdb.localdir 将 Rocksdb 数据刷新到附加的卷声明目录
  5. 启用增量检查点

我的 flink 作业从 kafka 源获取时间序列数据(时间、值),进行聚合和一些其他转换并将其发布到 kafka sink。

有时我的工作因检查点异常(10 分钟后超时)而失败,主要是由一名操作员完成。我不明白异步持续时间(在图像中)的含义以及为什么它花费的时间最长。在此异常之前,来自 kafka 的 5-8 百万条记录的吞吐量非常高,但在此检查点异常之后,它变得非常非常慢。另一个观察结果是增加并行性无助于增加吞吐量。我也启用了未对齐的检查点,但它没有多大帮助。

Flink 版本:1.13.2 提交:5f007ff @ 2021-07-23T04:35:55+02:00

感谢任何帮助。

编辑:

在大卫发表评论后,我在集群中进行了以下更改

我添加了一个单独的节点池,其中包含 10 个节点和 1 个本地 ssd(默认大小为 375 GB)连接到每个节点,并在此节点池中调度我的 TM pod。另外,我将rocksdb localdir 设置为将状态数据溢出到磁盘。我有 1 个 kafka 源,可以流式传输 10 亿个时间序列数据。结果如下:

检查点间隔 = 15 分钟,同时启用未对齐检查点

  1. 第一次运行,吞吐量真的很高。它在前约 40 分钟内处理了超过 3 亿个数据点。然后第三个检查点因超时而失败。超时后吞吐量下降
  2. 取消了第一份工作并重新提交了工作增益。吞吐量仍然很低,从约 1 亿/分钟降至约 100 万/分钟。
  3. 已删除 flink 集群,重新创建并重新提交作业。自过去 18 小时以来运行良好,没有检查点故障,但吞吐量非常低。它只处理了 6.25 亿。

问题

  1. 为什么第一次运行时吞吐量真的很高,然后又下降了?
  2. 第一次运行时仍然观察到检查点故障?
  3. 取消第一次运行的作业并重新提交它并没有帮助获得相同的吞吐量?
  4. 虽然作业现在运行良好,没有检查点故障,但为什么所有本地 ssd 设置的吞吐量仍然很低?
  5. 为什么检查点大小波动如此之大?有时多,有时少?

【问题讨论】:

    标签: kubernetes apache-flink


    【解决方案1】:

    我要对您的设置进行更改的第一件事是将 state.backend.rocksdb.localdir 移动到每个 TM pod 本地的临时存储。如果 RocksDB 本地磁盘实际上是 NFS 挂载,则可能会导致问题。

    至于异步持续时间,检查点分两个阶段完成。一个(希望是简短的)同步阶段,在此期间暂停流处理,以便可以检查一些关键内容,然后是一个通常更长的异步阶段,在此期间检查键控状态(在与正在进行的流处理不同的线程中)。

    【讨论】:

    • 感谢您的回复。我了解本地 ssd 提供高 IOPS 和吞吐量,但它与 k8s 集群中的节点具有节点亲和力。可以说,我有 4 个 k8s 集群节点。我需要为每个节点附加本地 ssd,因此一旦我的 TM pod 调度到任何节点,它总是与本地 ssd 有某种关联。这是正确的假设吗?如果这个假设是正确的,那么如果 TM pod 重新启动并调度到与之前不同的节点,是否可以。
    • 另一个问题,由于本地ssd存储是短暂的,可以松动状态吗?如果这个存储在检查点之前就死了怎么办?很抱歉有很多问题。提前致谢。
    • 为了让 Flink 运行良好,你希望你的工作状态在最快的可用本地存储中——RAM 或本地 SSD。 Flink 被设计成不依赖于本地、工作状态的生存。恢复后的正确性仅取决于检查点。如果 Flink 在完成第一个检查点之前确实失败了,则从头开始重新启动作业。
    • 大卫,我按照您的意见进行了尝试。用我的 cmets 更新了原始问题。你能帮忙吗?谢谢。
    【解决方案2】:

    完全同意 David,您应该使用本地磁盘。这里没有耐久性但性能要求。您可以查看此博客文章 [1] 了解详细信息。至于吞吐量下降,您是否检查了本地磁盘的指标?它们是可爆裂的磁盘吗?你可能在那里遇到瓶颈。检查此博客文章 [2] 以查看它是否符合您的情况。如果不是磁盘,还要检查 CPU 和内存。

    [1]https://flink.apache.org/2021/01/18/rocksdb.html

    [2]https://www.ververica.com/blog/the-impact-of-disks-on-rocksdb-state-backend-in-flink-a-case-study

    【讨论】:

      【解决方案3】:

      看起来您有一个用例,其中未对齐的检查点实际上根本没有帮助,反而会恶化它。

      未对齐的检查点适用于您遇到突然峰值或背压持续时间较长的用例 - 因此一开始您的吞吐量非常低的情况。

      现在未对齐的检查点正在检查点中持久保存瞬态数据。这就是为什么大小会爆炸并且实际上会根据网络缓冲区的饱和程度而发生很大变化的原因。

      从未对齐的检查点恢复也需要更长的时间(需要恢复更多数据)。


      现在的问题仍然是为什么存在未对齐检查点的超时。理论上,只有当您的 I/O 成为瓶颈时才会发生这种情况。据我所知,未对齐的检查点会将您的检查点大小从 1 GB 扩大到 9 GB。

      另一种解释实际上可能是 sink 端的完全瓶颈;由于您可能使用了KafkaConsumer 而不是新的KafkaSource,因此需要最少的数据流。但我会从你的解释中排除这一点。

      为了进一步调查,我需要一个超时检查点的屏幕截图(每个任务的详细统计信息)和日志。


      最后还有一个关于整体吞吐量的问题,即使作业完全重新启动。提供的信息很难回答这个问题。我猜你有一些有状态的操作,堆状态后端在达到某个状态大小后只是运行次优。您可以尝试调整您的 statebackend。例如,一定要为 SSD 调整 Rocks DB。

      【讨论】:

        猜你喜欢
        • 2021-01-27
        • 2019-12-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-13
        • 2021-01-10
        • 2014-03-23
        • 1970-01-01
        相关资源
        最近更新 更多