【问题标题】:How do you extract data from multiple websocket streams to use in a formula?如何从多个 websocket 流中提取数据以在公式中使用?
【发布时间】:2021-07-26 09:42:40
【问题描述】:

以下代码试图从“连接”集中获取流消息。我试图弄清楚如何从每个流中提取两个浮点值来计算商。聚合的流消息看起来是无序的和非连续的,如果我声明全局变量并尝试迭代地填充它们,我的例程就会停止。我希望能够连接到多个流并使用消息数据生成实时计算值/结果。如果可以的话请帮忙。我是 python 和 asyncio 的新手。

代码如下:

import asyncio
import websockets
import json

connections = set()
connections.add("mystream1")
connections.add("mystream2")

async def handle_socket1(url):
    async with websockets.connect(url) as websocket1:
        async for message in websocket1:
            json_msg = json.loads(message)
            if json_msg["X"] == "A":
                value1 = float(json_msg["b"])
            if json_msg["Y"] == "B":
                value2 = float(json_msg["a"])
            print(value2/value1)

async def handler1():
    await asyncio.wait([handle_socket1(url) for url in connections])

【问题讨论】:

  • 您发布的代码是计算单个流的商。您希望如何计算两个不同流的商?你想将json_msg["a"] 与另一个流中的json_msg["b"] 分开吗?
  • 另外,当一个值从一个流到达,但(还没有)从另一个流到达时会发生什么:应该立即重新计算商,还是代码应该等待新值在两个流中到达流?
  • 我想这解释了为什么我总是得到 0 的流值之一!理想情况下,我想同时连接到两个流,然后使用每个最新流中的一条数据计算最新商。我想不断刷新商,就好像它是一个实时值。

标签: python websocket async-await python-asyncio


【解决方案1】:

您可以将两个流汇集到一个队列中,并有一个单独的函数将任一值拉出队列并重新计算商:

async def handle_stream(url, identifier, queue):
    async with websockets.connect(url) as websocket1:
        async for message in websocket1:
            json_msg = json.loads(message)
            await queue.put((identifier, json_msg["value"]))

async def calculate(queue):
    value_a = value_b = None
    while True:
        identifier, value = await queue.get()
        if identifier == 'A':
            value_a = value
        else:
            value_b = value
        if value_a is not None and value_b is not None:
            print(value_a / value_b)

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(
        handle_stream("mystream1", "A", queue),
        handle_stream("mystream2", "B", queue),
        calculate(queue),
    )

【讨论】:

  • 谢谢!我认为我们正在接近我需要的东西。我一定是做错了什么,因为即使在我插入所有流、键和值之后,当我运行你的代码时也没有任何反应。澄清一下:我需要将 B 除以 A,其中 A 是来自 stream1 的值,其中 key = X,B 是来自 stream2 的值,其中 key = Y。另外,我对你的一些变量有点模糊:队列/identifier/json_msg["value"](在句柄流中)和值(在计算中)。提前为我的 python 新手道歉。附言我到处测试了打印语句,但没有得到任何输出。很奇怪。
  • @Query13 如果你没有得到输出,那么我猜安装失败了。原始代码运行了吗?至于变量,我不确定你的问题是什么。变量名称是否令人困惑,或者您不了解不同函数中变量的用途?你了解队列(有时也称为通道)在编程中的作用吗?
  • 是的,原始代码有效。我以你的为例,从头开始逐行添加,看看问题出在哪里。我会绕回来。谢谢。
  • 谢谢 user4815162342。我设法根据您建议的代码生成所需的结果。如果您有兴趣,我已经发布了另一个关于当我 Ctrl+C 退出循环时返回的异常的问题。
猜你喜欢
  • 2023-02-04
  • 2020-08-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多