【发布时间】:2021-02-12 06:42:17
【问题描述】:
我有一个 Python Kafka 消费者应用程序,我在其中使用消息,然后同步调用外部 Web 服务。 Web 服务需要一分钟来处理消息并发送响应。
有没有办法消费消息,将请求发送到 Web 服务并消费下一条消息而不等待响应?
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'spring_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')));
这就是我等待消息并发送外部 Web 请求的方式
def consume_msgs():
for message in consumer:
message = message.value;
send('{}'.format(message))
consume_msgs()
函数send() 需要一分钟才能得到响应。我想同时开始异步消费下一条消息,但我不知道从哪里开始
def send(pload) :
import requests
r = requests.post('someurl',data = pload)
print(r)
【问题讨论】:
-
你能分享你的发送功能吗?
-
当然。刚刚编辑了问题
-
我不打算发布答案,因为我不记得 python 消费者处理批处理的方式与 java 消费者之间是否存在任何重大差异,但我相信如果你制作异步网络使用stackoverflow.com/questions/22190403/… 之类的方式调用一批消息,您可以等待一批消息完成,然后再调用
commit。 -
其实貌似做AIO的人也有async kafka消费者github.com/aio-libs/aiokafka
标签: python apache-kafka python-asyncio