【发布时间】:2021-06-05 05:19:55
【问题描述】:
我正在开发一个旧的 Flink 管道,我们想要更改我们正在使用的接收器的实现。我们正在运行 Flink 1.10 正在尝试从 BucketingSink 转换到 StreamingFileSink,两者都将 ORC 写入同一个目的地。 我们的管道非常简单:我们将一些 kakfa 流合并到我们的单个接收器中(没有其他运算符)。
在部署测试期间,我注意到当我们从 sink1 切换到 sink2(反之亦然)时,我们最终会在我们正在编写的文件中丢失 kafka 消息(通过 hive/trino 查询)。丢失消息的 kafka 时间戳与我的部署一致,因此我相信这不是一些不相关的上游问题。
我目前的理论是,在检查点期间,接收器正在缓冲来自流的消息,因为它们等待所有检查点屏障,并且这些缓冲的事件被捕获为该接收器的检查点状态的一部分,并且 kafka 消息来源相信那些偏移量已交付/处理(即使它们尚未写入文件,但仅存在于接收器的缓冲区中)。因此,当我使用不同的接收器进行部署并从使用旧接收器创建的检查点开始时,那些缓冲的消息会丢失。我正在寻找确认这些接收器是否将缓冲的事件写入检查点状态,并导致 kafka 源将它们视为“已处理”,即使它们尚未写入文件。
我们的时间线如下所示:
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────┐ ┌──────┐
│Sink1 │ │Sink1 │ │Sink1 │ │Deploy│ │ Resume │ │Sink2 │
│ CP1 │─▶│ CP2 │─▶│ CP3 │─▶│Sink2 │─▶│from CP3│─▶│ CP4 │
└──────┘ └──────┘ └──────┘ └──────┘ └────────┘ └──────┘
我们最终在“Sink1 CP3”和“Sink2 CP4”之间写入到 ORC 文件的 kafka 消息存在间隙。所以我相信有些东西正在推进我们的 kafka 源中的 kafka 偏移量(尽管我们的源没有以任何方式改变)所以 kafka 源认为我们已经处理了这些缓冲的消息,并且在我们从 CP3 恢复后不会将它们发送到 Sink2 .更奇怪的是:如果我回到 Sink1 并从 CP4+ 恢复,CP3 和 CP4 之间缺少的事件会被写入!并且没有写入重复的事件,因此它不会将 kafka 源倒回到旧的偏移量并在 CP3 之后重新处理所有消息。
那么,我是否走在正确的轨道上,并且 kafka 源的偏移量已提前用于缓冲消息?有没有办法安全地从一个接收器过渡到另一个接收器而不会丢失这些狭窄的 kafka 消息片段?
【问题讨论】:
标签: apache-flink