【问题标题】:What is the best practice for keeping Kafka consumer alive in python?在 python 中保持 Kafka 消费者存活的最佳实践是什么?
【发布时间】:2020-09-10 02:34:17
【问题描述】:

在保持消费者活力方面,有些事情让我感到困惑。假设我有一个不断写入数据的主题。但是,在一天中的一个小时内,没有新消息。如果我为消费者设置了超时,当没有新消息时,消费者将关闭。

现在,新消息到达。但是,没有活着的消费者来消费它们。

我应该如何处理这种情况?我的消费者可能会消费所有消息并关闭。让他们活着的最好方法是什么?有没有办法在新消息到达时自动调用它们?此类场景的最佳做法是什么?

【问题讨论】:

    标签: python python-3.x apache-kafka kafka-consumer-api


    【解决方案1】:

    为什么不直接

    import time
    from confluent_kafka import Consumer
    
    
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-consumer-1',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe(['topicName'])
    
    while True:
        try: 
            message = consumer.poll(10.0)
    
            if not message:
                time.sleep(120) # Sleep for 2 minutes
    
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue
    
            print(f"Received message: {msg.value().decode('utf-8')}")
        except:
            # Handle any exception here
            ...
        finally:
            consumer.close()
            print("Goodbye")
    

    我无法评论“为消费者设置超时”的要求,但在大多数情况下,消费者应该“永远”运行,也应该以某种方式添加到消费者组中它们高度可用。

    【讨论】:

    • 准确地说,我希望它永远运行。那么为什么我的消费者代码中有 consumer.close() 呢?在失败的情况下更快地触发重新平衡?是否有任何以守护程序模式运行消费者的最佳实践?
    • 在@Giorgos Myrianthous 的上述示例代码中,您的消费者将永远运行并且不会到达 consumer.close() 除非发生异常或崩溃。这正是您想要实现的目标。
    【解决方案2】:

    使用生成器函数

    def consumableMessages(self):
        self.kafka.subscribe(self.topic)
        try:
            for message in self.kafka:
                yield message.value.decode("utf-8")
        except KeyboardInterrupt:
            self.kafka.close()
    

    然后我们可以等待消息:

    for message in kafka.consumableMessages():
        print(message)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-29
      • 1970-01-01
      • 1970-01-01
      • 2013-03-22
      • 2021-04-21
      • 2011-08-15
      • 2017-04-28
      • 1970-01-01
      相关资源
      最近更新 更多