【发布时间】:2021-12-11 15:46:11
【问题描述】:
我正在使用 docker 在本地机器上做一个简单的 kafka 生产/消费测试。
docker-compose 文件:https://github.com/confluentinc/cp-all-in-one/blob/6.2.1-post/cp-all-in-one/docker-compose.yml
我已经写了一个简单的python代码,如下所示:
import json
import random
import asyncio
from collections import namedtuple
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry import Schema
from confluent_kafka import SerializingProducer, DeserializingConsumer
from faker import Faker
from dataclasses import dataclass, field, asdict
faker = Faker()
registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})
@dataclass
class CIS:
user_id: str = field(default_factory=faker.user_name)
question_id: int = field(default_factory=lambda: random.randint(1, 20000))
is_correct: bool = field(default_factory=lambda: random.choice([True, False]))
async def produce(topic_name, serializer):
p = SerializingProducer({
"bootstrap.servers": "PLAINTEXT://localhost:9092",
"value.serializer": serializer
})
while True:
p.produce(
topic=topic_name,
value=CIS(),
)
print("put!")
await asyncio.sleep(1)
async def consume(topic_name, deserialzier):
c = DeserializingConsumer(
{
'bootstrap.servers': "PLAINTEXT://localhost:9092",
# 'key.deserializer': string_deserializer,
'value.deserializer': deserialzier,
'group.id': "123",
'auto.offset.reset': "latest"
}
)
c.subscribe([topic_name])
while True:
message = c.poll(0.1)
if message is None:
print(message)
continue
else:
print(message.value())
await asyncio.sleep(1)
if __name__ == "__main__":
topic_name = "my_topic"
schema_str = json.dumps(
{
"type": "record",
"name": "cis",
"namespace": "interaction",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "question_id", "type": "int"},
{"name": "is_correct", "type": "boolean"}
]
}
)
def to_dict(obj, ctx):
return asdict(obj)
def to_obj(obj, ctx):
return CIS(
user_id=obj["user_id"],
question_id=obj["question_id"],
is_correct=obj["is_correct"],
)
avro_serializer = AvroSerializer(registry_client, schema_str, to_dict)
avro_deserializer = AvroDeserializer(registry_client, schema_str, to_obj)
loop = asyncio.get_event_loop()
t1 = loop.create_task(produce(topic_name, avro_serializer))
t2 = loop.create_task(consume(topic_name, avro_deserializer))
results = await asyncio.gather(t1, t2)
当我运行这段代码时,输出是:
>>>
put!
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
....
我不明白为什么 produce() 只是第一次被调用。
【问题讨论】:
-
这是
consume中的continue。您唯一等待的时间是有消息时。如果没有,则打印出None并返回while True。
标签: python apache-kafka python-asyncio