【发布时间】:2021-09-16 04:17:31
【问题描述】:
这是我的 Kafka 生产者和消费者的实现:
async def produce(topic_name):
"""Produces data into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
curr_iteration = 0
while True:
p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
curr_iteration += 1
await asyncio.sleep(0.5)
async def consume(topic_name):
"""Consumes data from the Kafka Topic"""
c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
c.subscribe([topic_name])
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value()}")
await asyncio.sleep(2.5)
message.key() 是None,得到控制台输出如下:
consumed message None: b'iteration 1'
consumed message None: b'iteration 8'
consumed message None: b'iteration 12'
consumed message None: b'iteration 15'
如何更新代码以获取消息密钥?
【问题讨论】:
标签: python apache-kafka python-asyncio