【问题标题】:Messages lost when Kafka nodes are restartedKafka 节点重启时消息丢失
【发布时间】:2017-09-22 21:31:21
【问题描述】:

我在 AWS 上运行一个 3 节点的 Kafka 集群。

Kafka 版本:0.10.2.1
Zookeeper 版本:3.4

在执行一些稳定性测试时,我注意到当我关闭领导节点时消息会丢失。

这些是重现问题的步骤:

创建一个复制因子为 3 的主题,这应该使数据在所有 3 个节点上都可用。:

~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --describe --topic stackoverflow
Topic:stackoverflow    PartitionCount:20    ReplicationFactor:3    Configs:
    Topic: stackoverflow    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 4    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 5    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 6    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 7    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 8    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 9    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 10    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 11    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 12    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 13    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 14    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 15    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 16    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 17    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 18    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 19    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

使用以下代码开始制作该主题:

import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'])

try:
    count = 0
    while True:
        producer.send('stackoverflow', 'message')
        producer.flush()
        count += 1
        time.sleep(1)
except KeyboardInterrupt:
    print "Sent %s messages." % count

此时我杀死了其中一台机器并等待它返回集群。

当它回来时,我停止生产者并使用来自该主题的所有消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('stackoverflow',
                            bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'],
                            auto_offset_reset='earliest',
                            enable_auto_commit=False)
try:
    count = 0
    for message in consumer:
        count += 1
        print message
except KeyboardInterrupt:
    print "Received %s messages." % count

已发送的两条消息丢失。生产者没有返回任何错误。

kafka $ python producer.py
Sent 762 messages.

kafka $ python consumer.py
Received 760 messages.

我是 Kafka 的新手,所以如果有任何进一步调试的想法,我将不胜感激。或使集群更具弹性的说明。

感谢您的帮助!

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    前段时间我遇到了完全相同的问题。在调查过程中,我发现了一个有趣的特性:flush() 方法在缓冲区中的每条消息发送后返回或请求导致错误,如documentation 所述。

    我通过以下方式减轻了它:

    1. 在代理上禁用 unclean.leader.election.enabled(如果未设置,则在 kafkatrue,在 kafka 中为 false>=0.11,因此您需要在 0.10.2 上将其设置为 false)
    2. 将异步生产者(发送和刷新)更改为同步生产者:producer.send(...).get()
    3. 为 KafkaProducer init 添加了参数retries=5(以使生产者在代理关闭后仍然存在)。

    让我知道它是否适合你。

    【讨论】:

    • 感谢马吕斯的回答!我将集群升级到 0.11 并添加了 retries=5 参数。那并没有解决问题。更改为同步生产者确实做到了。 send() 失败时出现异常,因此我可以手动重试。但问题是,这不是一个完美的解决方案。我希望能够保留异步生产者,因为它使代码更简单。不停机真的不能使用异步生产者吗?
    • 要明确 - 我的帖子中的所有三个步骤都是必需的(这就是为什么仅设置 retries=5 和禁用不明确的选举并不能解决问题的原因)。使用该方法是唯一对我有用的方法。 AFAIR,在 java/scala API 中,send 方法具有类似于“故障回调”的功能,因此您可以收到消息是否未传递的通知,并且(例如)将其保存在文件中并在一段时间后从中恢复.但我在 python 中没有看到类似的功能。
    【解决方案2】:

    最后我认为丢失消息的原因是重试次数不足。在阅读了几篇关于高可用 kafka 的博文后,我注意到人们推荐的“重试次数”参数值非常高。

    在 python 中是这样的:

    producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)
    

    我再次进行了测试,确认没有任何消息丢失。

    【讨论】:

      猜你喜欢
      • 2019-08-10
      • 2020-01-14
      • 1970-01-01
      • 1970-01-01
      • 2021-03-17
      • 2019-07-12
      • 2019-05-20
      • 2017-10-30
      • 2021-01-05
      相关资源
      最近更新 更多