【问题标题】:Kafka message key is NoneKafka 消息密钥为无
【发布时间】:2021-09-16 04:17:31
【问题描述】:

这是我的 Kafka 生产者和消费者的实现:

async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    p = Producer({"bootstrap.servers": BROKER_URL})

    curr_iteration = 0
    while True:
        p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
        curr_iteration += 1
        await asyncio.sleep(0.5)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value()}")
        await asyncio.sleep(2.5)

message.key()None,得到控制台输出如下:

consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'

如何更新代码以获取消息密钥?

【问题讨论】:

    标签: python apache-kafka python-asyncio


    【解决方案1】:

    更新您的生产功能,如下所示,

    p.produce(topic, key="key", value="value")
    
    

    要开始向 Kafka 发送消息,请调用 produce 方法, 传入消息值(可能是None)和optionally a key、分区和回调。生产调用将立即完成 并且不返回值。如果 由于 librdkafka 的本地生产队列,消息无法入队 吃饱了。

    来源:-clients-confluent-kafka-python

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-13
      • 2021-06-24
      • 1970-01-01
      • 1970-01-01
      • 2012-05-15
      相关资源
      最近更新 更多