【问题标题】:Flink offset went to inconsistent state on manually resetting kafka offset手动重置 kafka 偏移量时,Flink 偏移量进入不一致状态
【发布时间】:2021-09-03 16:19:22
【问题描述】:

我们有一个从 kafka 读取 msg 的 flink 流应用程序。 由于某种原因,我们不得不从 kafka 重置命令将 kafka 偏移重置为最新的,因为有大量堆积。我们希望 flink 应用跳过所有这些消息并从重置后出现的新消息开始。

问题是因为 flink 在内部管理它的偏移量,它不知道这个重置,它现在只从向后读取 msg(重置之前的偏移点),现在也无法提交偏移量。因此,每次重新启动 flink 应用程序时,它都会再次从同一点读取。所以我们每次重启都有重复的味精。

我知道我们不应该在 flink kafka 应用程序中手动重置偏移量。但是我们如何从中恢复过来。

我尝试将 auto.offset.config 设置为最新,但它仍然会再次读取这些消息。

【问题讨论】:

  • 您是否正在从保存点/检查点重新启动您的 Flink 应用程序?如果是这样,您这样做是因为您要保留其他状态(除了 kafka 偏移量)吗?
  • 检查点已启用,检查点的默认时间。没有保存点。我们从 kafla 命令直接重置偏移量。我们没有保留其他状态。我们希望我们的 flink 应用程序从最新的 kafka 偏移量读取 msg,但问题是 flink 偏移量不同(因为我们将偏移量从 kafla 重置为最新的)。我们如何才能从一次又一次地收到这些消息中恢复过来。

标签: apache-kafka flink-streaming


【解决方案1】:

只有当 Flink 从故障中恢复或者从保存点或检查点手动重启时,它才会使用检查点或保存点中记录的偏移量。

否则,Flink Kafka 消费者将从消费者组在 Kafka 代理中提交的偏移量开始读取,或者从您在代码中明确指定的偏移量开始读取,即,

myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (msecs)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

我不知道如何将这些事实与你所报告的相协调。

【讨论】:

  • 您的意思是如果我们禁用检查点/保存点,它将从 kafka 指定的偏移量中读取?但我需要为此进行代码更改。有没有其他方法可以优雅地处理这个问题?
  • 您可以启用检查点。但不要以从检查点或保存点加载其状态的模式启动作业。
  • 怎么做?
  • 你想要的行为是默认的。无论是在应用程序代码中,还是在某些部署自动化中,都必须有一些东西覆盖它。
  • 当我设置 myConsumer.setStartFromLatest();它从最新的偏移量读取,但是当我删除它时,它再次从非常旧的偏移量开始。因为我们手动弄乱了 kafka(使用 kafka 命令),所以 flink 现在无法提交任何偏移量。如何从中恢复。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-06-01
  • 2021-04-06
  • 1970-01-01
  • 1970-01-01
  • 2017-06-24
  • 1970-01-01
  • 2018-02-03
相关资源
最近更新 更多