【问题标题】:Returning results from async function with infinite loop in Python在 Python 中使用无限循环从异步函数返回结果
【发布时间】:2018-09-01 15:16:12
【问题描述】:

我有一个订阅 websocket 流的异步函数。从这个函数将接收到的数据传递给程序的其他部分的最佳方法是什么?

下面是一个简短的示例代码。它将通过 Bitfinex 交易所的 websocket 订阅 BTCUSD 和 BTCETH 对订单簿通道,并将数据打印到标准输出。我需要分配给message 的数据,以便可以从程序的其他部分检索。

import json
import asyncio
import websockets

async def subscribe(ws_host, subscribe_request):
    async with websockets.connect(ws_host) as ws:
        request = json.dumps(subscribe_request)
        await ws.send(request)
        while True:
            try:
                message = await ws.recv()
                print(message)
            except websockets.exceptions.ConnectionClosed:
                print("Connection was closed")

if __name__ == '__main__':
    ws_host = 'wss://api.bitfinex.com/ws/2'
    subscribe_request_btc = dict( 
                            event='subscribe',
                            channel='book',
                            symbol='tBTCUSD',
                            prec='P0',
                            freq='F1',
                            len='25' 
                            )
    subscribe_request_eth = dict( 
                            event='subscribe',
                            channel='book',
                            symbol='tETHUSD',
                            prec='P0',
                            freq='F1',
                            len='25' 
                            )

    loop = asyncio.get_event_loop()
    tasks = [subscribe(ws_host, subscribe_request_btc), subscribe(ws_host, subscribe_request_eth)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

【问题讨论】:

  • 很多选项。您可以在每条消息上调用一个回调,或者允许客户端注册任意数量的回调,或者将其包装在 Protocol 类中,或者将您的函数重写为异步生成器,或者……没有“最好的方式”适用于所有可能的应用程序。

标签: python python-3.x asynchronous websocket async-await


【解决方案1】:

这是我到目前为止想出的。为每个 websocket 通道创建队列,启动一个监听 websocket 通道的守护线程并将消息写入适当的队列。主线程不断检查队列中是否有新值并将它们出列。

import json
import asyncio
import websockets
import queue
import threading

async def subscribe(ws_host, subscribe_request, q):
    async with websockets.connect(ws_host) as ws:
        request = json.dumps(subscribe_request)
        await ws.send(request)
        while True:
            try:
                message = await ws.recv()
                q.put(message)
            except websockets.exceptions.ConnectionClosed:
                print("Connection was closed")


if __name__ == '__main__':
    ws_host = 'wss://api.bitfinex.com/ws/2'
    subscribe_request_btc = dict( 
                            event='subscribe',
                            channel='book',
                            symbol='tBTCUSD',
                            prec='P0',
                            freq='F1',
                            len='25' 
                            )
    subscribe_request_eth = dict( 
                            event='subscribe',
                            channel='book',
                            symbol='tETHUSD',
                            prec='P0',
                            freq='F1',
                            len='25' 
                            )
    q_btc = queue.Queue()
    q_eth = queue.Queue()
    loop = asyncio.get_event_loop()
    tasks = [subscribe(ws_host, subscribe_request_btc, q_btc), subscribe(ws_host, subscribe_request_eth, q_eth)]
    t1 = threading.Thread(target=loop.run_until_complete, args=(asyncio.wait(tasks),), daemon=True)
    t1.start()
    while True:
        try:
            message_btc = q_btc.get(block=False)
            print('BTC channel: ', message_btc)
        except queue.Empty:
            pass
        try:
            message_eth = q_eth.get(block=False)
            print('ETH channel', message_eth)
        except queue.Empty:
            pass
    loop.close()

随意添加答案,展示更有效的方法。

【讨论】:

    猜你喜欢
    • 2022-12-22
    • 1970-01-01
    • 1970-01-01
    • 2021-10-16
    • 1970-01-01
    • 2020-03-15
    • 2016-03-11
    • 2012-03-30
    • 2019-09-30
    相关资源
    最近更新 更多