【问题标题】:How to run a function asynchronously as “fire and forget”?如何以“即发即弃”的方式异步运行函数?
【发布时间】:2021-02-12 06:42:17
【问题描述】:

我有一个 Python Kafka 消费者应用程序,我在其中使用消息,然后同步调用外部 Web 服务。 Web 服务需要一分钟来处理消息并发送响应。

有没有办法消费消息,将请求发送到 Web 服务并消费下一条消息而不等待响应?


from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'spring_test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')));

这就是我等待消息并发送外部 Web 请求的方式


def consume_msgs():
 for message in consumer:
    message = message.value;
    send('{}'.format(message))

consume_msgs() 

函数send() 需要一分钟才能得到响应。我想同时开始异步消费下一条消息,但我不知道从哪里开始

def send(pload) :
 import requests
 r = requests.post('someurl',data = pload)
 print(r)

【问题讨论】:

  • 你能分享你的发送功能吗?
  • 当然。刚刚编辑了问题
  • 我不打算发布答案,因为我不记得 python 消费者处理批处理的方式与 java 消费者之间是否存在任何重大差异,但我相信如果你制作异步网络使用stackoverflow.com/questions/22190403/… 之类的方式调用一批消息,您可以等待一批消息完成,然后再调用commit
  • 其实貌似做AIO的人也有async kafka消费者github.com/aio-libs/aiokafka

标签: python apache-kafka python-asyncio


【解决方案1】:

不确定这是否是您需要的,但您能否将每个对send 的调用转入一个线程?像下面这样的东西。这样,for 循环将继续,而无需等待 send 返回。如果您使用数据的速度远远快于处理数据的速度,您可能不得不以某种方式限制线程数。

from threading import Thread  


def consume_msgs():
 for message in consumer:
    message = message.value;
    Thread(target=send, args = ('{}'.format(message),)).start()

consume_msgs() 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-02
    • 1970-01-01
    • 2013-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多