【发布时间】:2021-09-03 16:19:22
【问题描述】:
我们有一个从 kafka 读取 msg 的 flink 流应用程序。 由于某种原因,我们不得不从 kafka 重置命令将 kafka 偏移重置为最新的,因为有大量堆积。我们希望 flink 应用跳过所有这些消息并从重置后出现的新消息开始。
问题是因为 flink 在内部管理它的偏移量,它不知道这个重置,它现在只从向后读取 msg(重置之前的偏移点),现在也无法提交偏移量。因此,每次重新启动 flink 应用程序时,它都会再次从同一点读取。所以我们每次重启都有重复的味精。
我知道我们不应该在 flink kafka 应用程序中手动重置偏移量。但是我们如何从中恢复过来。
我尝试将 auto.offset.config 设置为最新,但它仍然会再次读取这些消息。
【问题讨论】:
-
您是否正在从保存点/检查点重新启动您的 Flink 应用程序?如果是这样,您这样做是因为您要保留其他状态(除了 kafka 偏移量)吗?
-
检查点已启用,检查点的默认时间。没有保存点。我们从 kafla 命令直接重置偏移量。我们没有保留其他状态。我们希望我们的 flink 应用程序从最新的 kafka 偏移量读取 msg,但问题是 flink 偏移量不同(因为我们将偏移量从 kafla 重置为最新的)。我们如何才能从一次又一次地收到这些消息中恢复过来。
标签: apache-kafka flink-streaming