【问题标题】:`produce()` only called once in asynchronous kafka implementation?`produce()` 只在异步 kafka 实现中调用一次?
【发布时间】:2021-12-11 15:46:11
【问题描述】:

我正在使用 docker 在本地机器上做一个简单的 kafka 生产/消费测试。

docker-compose 文件:https://github.com/confluentinc/cp-all-in-one/blob/6.2.1-post/cp-all-in-one/docker-compose.yml

我已经写了一个简单的python代码,如下所示:

import json
import random
import asyncio
from collections import namedtuple

from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry import Schema
from confluent_kafka import SerializingProducer, DeserializingConsumer

from faker import Faker
from dataclasses import dataclass, field, asdict

faker = Faker()

registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})


@dataclass
class CIS:
    user_id: str = field(default_factory=faker.user_name)
    question_id: int = field(default_factory=lambda: random.randint(1, 20000))
    is_correct: bool = field(default_factory=lambda: random.choice([True, False]))


async def produce(topic_name, serializer):
    p = SerializingProducer({
        "bootstrap.servers": "PLAINTEXT://localhost:9092",
        "value.serializer": serializer
    })
    while True:
        p.produce(
            topic=topic_name,
            value=CIS(),
        )
        print("put!")
        await asyncio.sleep(1)


async def consume(topic_name, deserialzier):
    c = DeserializingConsumer(
        {
            'bootstrap.servers': "PLAINTEXT://localhost:9092",
            # 'key.deserializer': string_deserializer,
            'value.deserializer': deserialzier,
            'group.id': "123",
            'auto.offset.reset': "latest"
        }
    )
    c.subscribe([topic_name])
    while True:
        message = c.poll(0.1)
        if message is None:
            print(message)
            continue
        else:
            print(message.value())
        await asyncio.sleep(1)


if __name__ == "__main__":
    topic_name = "my_topic"
    schema_str = json.dumps(
        {
            "type": "record",
            "name": "cis",
            "namespace": "interaction",
            "fields": [
                {"name": "user_id", "type": "string"},
                {"name": "question_id", "type": "int"},
                {"name": "is_correct", "type": "boolean"}
            ]
        }
    )

    def to_dict(obj, ctx):
        return asdict(obj)

    def to_obj(obj, ctx):
        return CIS(
            user_id=obj["user_id"],
            question_id=obj["question_id"],
            is_correct=obj["is_correct"],
        )

    avro_serializer = AvroSerializer(registry_client, schema_str, to_dict)
    avro_deserializer = AvroDeserializer(registry_client, schema_str, to_obj)

    loop = asyncio.get_event_loop()
    t1 = loop.create_task(produce(topic_name, avro_serializer))
    t2 = loop.create_task(consume(topic_name, avro_deserializer))
    results = await asyncio.gather(t1, t2)

当我运行这段代码时,输​​出是:

>>>
put!
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
....

我不明白为什么 produce() 只是第一次被调用。

【问题讨论】:

  • 这是consume 中的continue。您唯一等待的时间是有消息时。如果没有,则打印出None 并返回while True

标签: python apache-kafka python-asyncio


【解决方案1】:

confluent_kafka 不兼容 asyncio,它使用阻塞调用。

对于异步代码,我可以建议aiokafka。该项目的 README 中有代码 sn-ps 说明了如何编写异步生产者和消费者。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-14
    • 2011-08-03
    • 2011-01-22
    • 2012-08-30
    • 2021-05-24
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    相关资源
    最近更新 更多