【发布时间】:2018-07-26 18:19:29
【问题描述】:
我正在使用 Apache Flink 1.3.2 集群。我们正在使用 Kafka 消息,自从将代理升级到 1.1.0(从 0.10.2)后,我们经常在日志中注意到这个错误:
ERROR o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null
因此,有时我们会在处理过程中遇到丢失事件。我们在工作中使用 FlinkKafkaConsumer010。
检查点已启用(间隔 10 秒,超时 1 分钟,检查点之间的最小暂停 5 秒,最大并发检查点 1。E2E 持续时间平均不到 1 秒,甚至我会说不到半秒。)使用相同的设置Kafka 0.10.2 我们没有这个例外。
更新:我们已经重新安装了 Kafka,现在我们收到一条警告消息,但仍然没有读取任何事件
WARN o.a.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.
【问题讨论】:
-
您是否启用了检查点?如果未启用,Flink 会定期提交 Kafka 偏移量,并且它不是检查点状态的一部分。当消费来自 Kafka 的消息时抛出异常时,可能会丢失一些消息,因为它们可能尚未被流式作业中的所有任务处理。
-
详细更新,感谢提问
-
偏移提交失败并不意味着数据丢失。我希望可能有重复的数据,但不会丢失数据。你确定你正在丢失数据吗?如果是这样,那么日志中肯定还有其他内容,例如由于超时而无法获取数据。 Kafka 客户端上的 DEBUG 级别日志记录可能会提供更多洞察力。
-
@dawsaw 有,我不知何故没有粘贴完整的消息“错误 o.a.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - 异步 Kafka 提交失败。org.apache.kafka.clients.consumer.RetriableCommitFailedException :偏移量提交失败,出现可重试异常。您应该重试提交偏移量。原因:org.apache.kafka.common.errors.DisconnectException: null"
标签: apache-flink flink-streaming flink-cep