【问题标题】:Apache Flink: Do sinks store items buffered from stream during checkpoint into checkpoint state?Apache Flink:接收器是否将检查点期间从流缓冲的项目存储到检查点状态?
【发布时间】:2021-06-05 05:19:55
【问题描述】:

我正在开发一个旧的 Flink 管道,我们想要更改我们正在使用的接收器的实现。我们正在运行 Flink 1.10 正在尝试从 BucketingSink 转换到 StreamingFileSink,两者都将 ORC 写入同一个目的地。 我们的管道非常简单:我们将一些 kakfa 流合并到我们的单个接收器中(没有其他运算符)。

在部署测试期间,我注意到当我们从 sink1 切换到 sink2(反之亦然)时,我们最终会在我们正在编写的文件中丢失 kafka 消息(通过 hive/trino 查询)。丢失消息的 kafka 时间戳与我的部署一致,因此我相信这不是一些不相关的上游问题。

我目前的理论是,在检查点期间,接收器正在缓冲来自流的消息,因为它们等待所有检查点屏障,并且这些缓冲的事件被捕获为该接收器的检查点状态的一部分,并且 kafka 消息来源相信那些偏移量已交付/处理(即使它们尚未写入文件,但仅存在于接收器的缓冲区中)。因此,当我使用不同的接收器进行部署并从使用旧接收器创建的检查点开始时,那些缓冲的消息会丢失。我正在寻找确认这些接收器是否将缓冲的事件写入检查点状态,并导致 kafka 源将它们视为“已处理”,即使它们尚未写入文件。

我们的时间线如下所示:

┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐  ┌────────┐  ┌──────┐
│Sink1 │  │Sink1 │  │Sink1 │  │Deploy│  │ Resume │  │Sink2 │
│ CP1  │─▶│ CP2  │─▶│ CP3  │─▶│Sink2 │─▶│from CP3│─▶│ CP4  │
└──────┘  └──────┘  └──────┘  └──────┘  └────────┘  └──────┘

我们最终在“Sink1 CP3”和“Sink2 CP4”之间写入到 ORC 文件的 kafka 消息存在间隙。所以我相信有些东西正在推进我们的 kafka 源中的 kafka 偏移量(尽管我们的源没有以任何方式改变)所以 kafka 源认为我们已经处理了这些缓冲的消息,并且在我们从 CP3 恢复后不会将它们发送到 Sink2 .更奇怪的是:如果我回到 Sink1 并从 CP4+ 恢复,CP3 和 CP4 之间缺少的事件会被写入!并且没有写入重复的事件,因此它不会将 kafka 源倒回到旧的偏移量并在 CP3 之后重新处理所有消息。

那么,我是否走在正确的轨道上,并且 kafka 源的偏移量已提前用于缓冲消息?有没有办法安全地从一个接收器过渡到另一个接收器而不会丢失这些狭窄的 kafka 消息片段?

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    是的,在检查点期间 flink 正在使用状态存储后端,检查保存点功能以避免丢失数据 https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

    什么是保存点?保存点与检查点有何不同? # Savepoint 是流作业执行状态的一致图像,通过 Flink 的检查点机制创建。您可以使用 Savepoints 来停止和恢复、fork 或更新您的 Flink 作业。保存点由两部分组成:在稳定存储(例如 HDFS、S3 等)上包含(通常较大的)二进制文件的目录和(相对较小的)元数据文件。稳定存储上的文件代表作业执行状态图像的净数据。 Savepoint 的元数据文件(主要)包含指向稳定存储上的所有文件的指针,这些文件是 Savepoint 的一部分,以相对路径的形式。

    【讨论】:

    • 我一直在尝试保存点,但在切换接收器时,我似乎仍然会丢失事件。如果我采用一个保存点,并继续使用同一个接收器,我就没有错过任何事件。如果我使用保存点并继续使用另一个接收器,我错过了事件。保存点和检查点对我来说表现相同,因为除非使用相同的接收器,否则来自我们的 kafka 源的一些事件似乎“丢失”了。这表明检查点和快照都将缓冲的事件与特定的接收器实现相关联。
    • 我明白,我会尝试搜索“安全关闭”,其中缓冲区在完全停止之前已完全刷新,? 在其他技术中称为“优雅关闭”
    • 在 TERMINATE 中,我们希望计时器触发,因为这将清除与计时器相关的任何缓冲状态,例如非防火窗户。在这种情况下,我们应该为事件时间计时器发出 MAX_WATERMARK。对于处理时间计时器,我们目前无能为力
    • 我绝对愿意对我们的代码进行检测以说“在检查点 X 上或之后停止摄取事件并刷新缓冲区”,但我没有太多时间采用这种方法。我还刚刚注意到我们的接收器共享相同的 UID,因此我正在重新运行 UID 唯一的实验,因为这种冲突可能会毒化我的实验结果。感谢您的融合链接,非常有趣的阅读!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-05
    • 2021-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-16
    相关资源
    最近更新 更多