【问题标题】:Python kafka: Is there a way to block a consumer on a kafka topic till a new message is posted?Python kafka:有没有办法在发布新消息之前阻止消费者使用 kafka 主题?
【发布时间】:2018-09-10 17:18:04
【问题描述】:

我有一个消费者订阅了一个生产者线程定期发布的测试主题。我希望能够阻止消费者线程,直到有新消息出现 - 然后处理该消息并再次开始等待。我最接近的是:

consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                         bootstrap_servers=[localhost_],
                         api_version=(0, 10), consumer_timeout_ms=1000)
while True:
    print(consumer.poll(timeout_ms=5000))

有没有更惯用的方式(或者这种方式有什么我看不到的严重问题)?

kafka 新手,因此非常欢迎对此设计提出一般性建议。完整(运行)示例:

import time
from threading import Thread

import kafka
from kafka import KafkaProducer, KafkaConsumer

print('python-kafka:', kafka.__version__)

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(str(key), encoding='utf-8')
        value_bytes = bytes(str(value), encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print('Exception in publishing message\n', ex)

localhost_ = 'localhost:9092'

def kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    j = 0
    while True:
        publish_message(_producer, topic_name, value=j, key=j)
        j += 1
        time.sleep(5)

if __name__ == '__main__':
    print('Running Producer..')
    topic_name = 'test'
    prod_thread = Thread(target=kafka_producer)
    prod_thread.start()
    consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                             bootstrap_servers=[localhost_],
                             api_version=(0, 10), consumer_timeout_ms=1000)
    # consumer.subscribe([topic_name])
    while True:
        print(consumer.poll(timeout_ms=5000))

python-kafka: 1.3.5

【问题讨论】:

    标签: python python-3.x apache-kafka kafka-python


    【解决方案1】:

    Kafka: The Definitive Guide 也建议在无限循环中轮询。以下是来自Chapter 4. Kafka Consumers: Reading Data from Kafka 的Java 摘录,使用了相同的想法:

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            ...
        }
    }
    

    这很好地转化为建议如何在 Python 中使用这些库。

    kafka-python(参见A Tale of Two Kafka Clients中的完整示例)

    from kafka import KafkaConsumer
    ...
    kafka_consumer = Consumer(
    ...
    )
    consumer.subscribe([topic])
    
    running = True
    while running:
        message = kafka_consumer.poll()
    ...
    

    confluent-kafka-python(参见Introduction to Apache Kafka for Python Programmers中的完整示例)

    from confluent_kafka import Consumer, KafkaError
    ...
    c = Consumer(settings)
    
    c.subscribe(['mytopic'])
    
    try:
        while True:
            msg = c.poll(0.1)
    ...
    

    另一个可能出现的密切相关的问题是您如何处理消息。

    这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能导致处理尝试失败。

    因此,实现重试逻辑可能是一个好主意,您可以在博文Retrying consumer architecture in the Apache Kafka 中找到一个很好的描述。

    【讨论】:

    • 注意:问题是使用python
    • @cricket_007 Python 库使用了相同的想法,我也添加了对这些库的引用。
    • 更完整的例子可以在docs.confluent.io/current/clients/…找到
    • @OneCricketeer 这行:for msg in consumer: 是否会像here 那样无限消耗消息?如果不是,是否将其包装在一个 while True 循环下会起作用?
    • @y_159 可以,但是你链接的库不是confluent_kafka,这里使用了poll()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-11-14
    • 1970-01-01
    • 1970-01-01
    • 2017-09-23
    • 2016-08-14
    • 2022-10-23
    • 1970-01-01
    相关资源
    最近更新 更多