执行此操作的最简单方法就像您提到的将阅读移到循环之外的单独任务中。在这个范例中,您需要使用最新数据更新局部变量,使您的代码看起来像这样:
@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
json_data = await websocket.receive_json()
async def read_from_socket(websocket: WebSocket):
nonlocal json_data
async for data in websocket.iter_json():
json_data = data
asyncio.create_task(read_from_socket(websocket))
while True:
print(f"getting weather data for {json_data}")
await asyncio.sleep(1) # simulate a slow call to the weather service
注意,我使用了iter_json 异步生成器,这相当于receive_json 上的无限循环。
这可行,但根据您的要求可能会有错误。想象一下,天气服务需要 10 秒才能完成,在这段时间内,用户通过套接字向不同城市发送了三个请求。在上面的代码中,您只会获得用户发送的最新城市。这对您的应用程序可能没问题,但如果您需要跟踪用户发送的所有内容,则需要使用队列。在此范例中,您将有一项任务读取数据并将其放入队列,一项任务从队列中获取数据并查询天气服务。然后,您将与 gather 同时运行这些。
@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
queue = asyncio.queues.Queue()
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_json():
print(f"putting {data} in the queue")
queue.put_nowait(data)
async def get_data_and_send():
data = await queue.get()
while True:
if queue.empty():
print(f"getting weather data for {data}")
await asyncio.sleep(1)
else:
data = queue.get_nowait()
print(f"Setting data to {data}")
await asyncio.gather(read_from_socket(websocket), get_data_and_send())
这样,您就不会丢失用户发送的数据。在上面的示例中,我只获取用户请求的最新天气数据,但您仍然可以访问所有发送的数据。
编辑:要在 cmets 中回答您的问题,队列方法可能最好在新请求进入时取消任务。基本上将您希望能够取消的长时间运行的任务转移到它自己的协程函数中(在这个例如read_and_send_to_client) 并将其作为任务运行。当新数据进来时,如果该任务没有完成,则取消它,然后创建一个新的。
async def read_and_send_to_client(data):
print(f'reading {data} from client')
await asyncio.sleep(10) # simulate a slow call
print(f'finished reading {data}, sending to websocket client')
@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
queue = asyncio.queues.Queue()
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_json():
print(f"putting {data} in the queue")
queue.put_nowait(data)
async def get_data_and_send():
data = await queue.get()
fetch_task = asyncio.create_task(read_and_send_to_client(data))
while True:
data = await queue.get()
if not fetch_task.done():
print(f'Got new data while task not complete, canceling.')
fetch_task.cancel()
fetch_task = asyncio.create_task(read_and_send_to_client(data))
await asyncio.gather(read_from_socket(websocket), get_data_and_send())