【问题标题】:How to consume messages from kafka producer in batches (kafka-python)如何批量消费来自kafka生产者的消息(kafka-python)
【发布时间】:2022-02-14 13:41:25
【问题描述】:

我在 python 中有一个 kafka 生产者和消费者。我希望批量消费来自 kafka 生产者的消息,比如说 2。从生产者那里,我一直在发送如下电子邮件数据:

[{
    "email" : "sukhi215c@gmail.com",
    "subject": "Test 1",
    "message" : "this is a test"
},
{
    "email" : "sukhi215c@gmail.com",
    "subject": "Test 2",
    "message" : "this is a test"   
},
{
    "email" : "sukhi215c@gmail.com",
    "subject": "Test 3",
    "message" : "this is a test"   
},
{
    "email" : "sukhi215c@gmail.com",
    "subject": "Test 4",
    "message" : "this is a test"   
}]

我正在尝试批量使用这些数据。我希望一次使用 2 条消息并根据这 2 条数据发送电子邮件并使用下一组数据。我尝试的解决方法是:

consumer = KafkaConsumer(topic, bootstrap_servers=[server], api_version=(0, 10))
for message in consumer[:2]:
    string = message.value.decode("utf-8")
    dict_value = ast.literal_eval(string)

我得到的错误是:

    for message in consumer[:2]:
TypeError: 'KafkaConsumer' object is not subscriptable

有人可以帮我解决这个问题吗?

【问题讨论】:

    标签: apache-kafka kafka-python


    【解决方案1】:

    消费者不是一个集合;它的迭代器是无限的。

    如果您想每两个事件执行一次操作,请使用计数器或您自己的列表

    data = []
    consumer = KafkaConsumer(topic, bootstrap_servers=[server], api_version=(0, 10))
    for message in consumer:
        data.append(message)
        if len(data) >= 2:
            action(data)
            data.clear()
    

    【讨论】:

    • 感谢您的回复。我正在使用多线程来拥有两个线程,一个用于批量消费消息,一个用于发送电子邮件。我希望仅在电子邮件线程完成发送电子邮件后才使用下一批消息。知道如何实现吗?
    • 如果你想阻塞消费者,那么没有理由使用单独的线程
    • 对不起,我错了。我的意思是我必须同时使用消息和发送电子邮件。这个变通办法可以解决这个问题吗?
    • 我不明白为什么它不会。唯一的缺点是如果您收到奇数条消息,如果等待时间过长,它可能会丢弃最后一条消息
    • 但是在这个我们实际上消耗了所有的消息,但是我们只是一分为二地处理它。但我真正想要的是只消耗 2 个数据
    猜你喜欢
    • 2019-07-01
    • 2021-04-24
    • 2023-01-21
    • 1970-01-01
    • 1970-01-01
    • 2020-08-24
    • 2022-08-16
    • 1970-01-01
    • 2023-03-08
    相关资源
    最近更新 更多