【问题标题】:Python kafka consumer group id issuePython kafka消费者组ID问题
【发布时间】:2016-07-15 05:41:25
【问题描述】:

阿法伊克,

kafka 中引入了分区和(消费者)组的概念来实现并行。我正在通过python使用kafka。我有一个特定的主题,它有(比如说)2 个分区。这意味着,如果我启动一个包含 2 个消费者的消费者组,他们将被映射(订阅)到不同的分区。

但是,在 python 中使用kafka 库时,我遇到了一个奇怪的问题。我启动了 2 个具有基本相同 group-id 的消费者,并启动了线程让他们消费消息。

但是,kafka-stream 中的每条消息都被他们俩消费了!!这对我来说似乎很荒谬,甚至在概念上也不正确。无论如何我可以手动将消费者映射到某些(不同的)分区(如果它们没有自动映射到不同的分区)?

代码如下:

from kafka import KafkaConsumer
import thread

def con1(consumer):
    for msg in consumer:
        print msg

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])

thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))

这是我使用 kafka-console-producer 生成的一些消息的输出:

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')

虽然预期是其中之一。 BTW,这个话题k-test有2个分区。

【问题讨论】:

标签: python kafka-consumer-api kafka-python


【解决方案1】:

我猜你正在使用 Kafka 0.8 或更低版本,基于documents 不支持此功能:

... 但是,某些功能只能在较新的代理上启用;为了 例如,完全协调的消费者群体——即动态分区 分配给同一组中的多个消费者 - 需要使用 0.9+ kafka 经纪人 ...

【讨论】:

    【解决方案2】:
    from kafka import KafkaConsumer
    from kafka import TopicPartition
    
    TOPIC = "k-test"
    PARTITION_0 = 0
    PARTITION_1 = 1
    
    consumer_0 = KafkaConsumer(
        TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
    )
    consumer_1 = KafkaConsumer(
        TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
    )
    topic_partition_0 = TopicPartition(TOPIC, PARTITION_0)
    topic_partition_1 = TopicPartition(TOPIC, PARTITION_1)
    # format: topic, partition
    consumer_0.assign([topic_partition_0])
    consumer_1.assign([topic_partition_1])
    

    assign() 可能对你有用,但是一旦你使用它,当消费者停止工作时,kafka 不会自动平衡消费者。

    【讨论】:

    • 那么特定分区如何在不再次明确分配的情况下获得任何消费者?那么它不会使分布式系统的可用性属性无效吗?
    【解决方案3】:

    尝试运行 bin/kafka-consumer-groups.sh 命令行工具来验证您使用的 Python Kafka 客户端是否支持正确的消费者组管理。如果两个消费者确实在同一个组中,那么他们应该从互斥分区获取消息。

    【讨论】:

      【解决方案4】:

      根据我的经验,密钥必须长于 4 个字符,否则一切都转到 partition0

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-06-19
        • 2014-10-25
        • 2017-08-24
        • 1970-01-01
        • 2019-05-01
        • 1970-01-01
        • 1970-01-01
        • 2020-04-02
        相关资源
        最近更新 更多