【问题标题】:Implement Kafka Producer Consumer Chat with Queue使用队列实现 Kafka 生产者消费者聊天
【发布时间】:2020-12-03 13:34:08
【问题描述】:

我已经使用 python 创建了带有 Topic 的实现的 Kafka 生产者-消费者消息传递。我怎样才能对 Queue 做同样的事情,以便消息只会发送给单个消费者。

这是我的生产者代码

# Import KafkaProducer from Kafka library
from kafka import KafkaProducer

# Define server with port
bootstrap_servers = ['localhost:9092']

# Define topic name where the message will publish
topicName = 'First_Topic'

# Initialize producer variable
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
i=1
for i in range(100):
# Publish text in defined topic
    message_data = input("Enter message ")

    producer.send(topicName,str.encode(message_data)  )

# Print message
    print("Message Sent")
i=i+1

这是我的消费者代码。

from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
consumer.subscribe(['First_Topic'])

for msg in consumer:
    print("Message from Producer on Topic - "+msg.topic+":"+ msg.value.decode())

【问题讨论】:

  • 您的消费者代码看起来不错。有什么问题?

标签: python apache-kafka kafka-python


【解决方案1】:

当生产者在 Kafka 主题上生成消息时,它不会直接发送给消费者,而是会首先发送到特定分区上的 Kafka 代理(类似于持久分布式队列)。

消费者收到消息的方式和数量取决于消费者的设置。 根据您的订阅方案,一个或多个消费者可以订阅相同的主题/分区。如果您使用不同的消费者 groupId(或根本没有组 ID)启动多个消费者,那么所有消费者将获得相同的数据。 如果所有消费者使用相同的组 ID,则只有一个消费者会从一个分区获取消息。

【讨论】:

    猜你喜欢
    • 2012-01-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多