【问题标题】:Python Kafka consumer missing to poll some messagesPython Kafka 消费者缺少轮询一些消息
【发布时间】:2018-09-29 19:17:56
【问题描述】:

我的 Kafka 消费者的代码如下所示

def read_messages_from_kafka():
    topic = 'my-topic'
    consumer = KafkaConsumer(
        bootstrap_servers=['my-host1', 'my-host2'],
        client_id='my-client',
        group_id='my-group',
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        api_version=(0, 8, 2)
    )
    consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])

    messages = consumer.poll(timeout_ms=kafka_config.poll_timeout_ms, max_records=kafka_config.poll_max_records)

    for partition in messages.values():
        for message in partition:
            log.info("read {}".format(message))

    if messages:
        consumer.commit()

    next_offset0, next_offset1 = consumer.position(TopicPartition(topic, 0)), consumer.position(TopicPartition(topic, 1))
    log.info("next offset0={} and offset1={}".format(next_offset0, next_offset1))

while True:
    read_messages_from_kafka()
    sleep(kafka_config.poll_sleep_ms / 1000.0)

我意识到这种消费者设置无法读取所有消息。而且我无法重现此问题,因为它是间歇性问题。

当我将使用 kafka-cat 的最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题?

kafkacat -C -b my-host1 -X broker.version.fallback=0.8.2.1 -t my-topic -o -100

只有too many ways to consume messages in python。应该有一种并且最好只有一种明显的方法来做到这一点。

【问题讨论】:

  • 您是否知道每次要阅读消息时都在创建新连接?最好将 KafkaConsumer 创建移动到您的 while True 循环之上。

标签: python apache-kafka


【解决方案1】:

您的 Kafka 客户端存在消息丢失问题。 我找到了解决方案here

while True:
    raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
    for topic_partition, messages in raw_messages.items():
        application_message = json.loads(message.value.decode())

另外还有一个Kafka客户端存在:confluent_kafka.没有这个问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-03-06
    • 2016-05-15
    • 2017-09-23
    • 1970-01-01
    • 1970-01-01
    • 2020-12-18
    • 1970-01-01
    相关资源
    最近更新 更多