【问题标题】:Recover data sent via websocket恢复通过 websocket 发送的数据
【发布时间】:2020-05-21 21:14:09
【问题描述】:

我使用 Websocket 检索数据以进行进一步处理。 我不知道如何在课堂之外检索它。 我使用线程模块将 websocket 与程序的其余部分分开,这样我就可以运行一个 pyqt5 应用程序,在其中显示处理后的数据,但我无法检索它。

也许我应该使用线程以外的东西,但我不知道。 因为我可以接收大量数据并且有很多工作要做,计算、显示等。我尝试将其优化到最低限度,否则它将永远无法每秒处理我的所有请求。

import websockets
import asyncio
import json
import threading
import time


class WS(object):
    def __init__(self, serveur):
        self.serveur = serveur

    async def connect(self):

        async with websockets.connect(self.serveur) as websocket:
            while True:
                message = await websocket.recv()
                self.data = json.loads(message)
                print(self.data)


uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
ws = WS(uri)

loop = asyncio.get_event_loop()
th1 = threading.Thread(target=lambda: loop.run_until_complete(ws.connect()))
th1.start()

while True:  # My application that will display and process the data retrieved by the websocket.
    print('blabla')
    time.sleep(3)

【问题讨论】:

    标签: python python-3.x websocket pyqt5 python-asyncio


    【解决方案1】:

    默认情况下,Qt 不支持事件循环,因为在这些情况下使用线程通常是一种有效的解决方法,但在这些情况下,最好使用诸如 qasync(python -m pip install qasync) 和 asyncqt(python -m pip install asyncqt 之类的库)。考虑到这种情况,一个可能的解决方案是使用 Qt 信号来发送信息。

    import asyncio
    import json
    import websockets
    
    from PyQt5 import QtCore, QtWidgets
    from asyncqt import QEventLoop
    
    
    class WS(QtCore.QObject):
        dataChanged = QtCore.pyqtSignal(dict)
    
        def __init__(self, server, parent=None):
            super().__init__(parent)
            self._server = server
    
        @property
        def server(self):
            return self._server
    
        async def connect(self):
    
            async with websockets.connect(self.server) as websocket:
                while True:
                    message = await websocket.recv()
                    data = json.loads(message)
                    self.dataChanged.emit(data)
    
    
    class MainWindow(QtWidgets.QMainWindow):
        def __init__(self, parent=None):
            super().__init__(parent)
    
            self.text_edit = QtWidgets.QTextEdit()
            self.setCentralWidget(self.text_edit)
    
        @QtCore.pyqtSlot(dict)
        def update_data(self, data):
            # only for test
            text = json.dumps(data)
            self.text_edit.setPlainText(text)
    
    
    def main():
        import sys
    
        app = QtWidgets.QApplication(sys.argv)
        loop = QEventLoop(app)
        asyncio.set_event_loop(loop)
    
        uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
        ws = WS(uri)
    
        w = MainWindow()
        ws.dataChanged.connect(w.update_data)
        w.show()
    
        with loop:
            loop.create_task(ws.connect())
            loop.run_forever()
    
    
    if __name__ == "__main__":
        main()
    

    更新:

    import asyncio
    import json
    import websockets
    
    from PyQt5 import QtCore, QtWidgets
    from asyncqt import QEventLoop, asyncSlot
    
    
    class WS(QtCore.QObject):
        dataChanged = QtCore.pyqtSignal(dict)
    
        def __init__(self, server, parent=None):
            super().__init__(parent)
            self._server = server
            self._websocket = None
    
        @property
        def websocket(self):
            return self._websocket
    
        async def connect(self):
            self._websocket = await websockets.connect(self._server)
            await self.ready_read()
    
        async def ready_read(self):
            while True:
                message = await self.websocket.recv()
                data = json.loads(message)
                self.dataChanged.emit(data)
    
        @asyncSlot(dict)
        async def send_message(self, message):
            data = json.dumps(message)
            await self.websocket.send(data)
    
    
    class MainWindow(QtWidgets.QMainWindow):
        sendMessageSignal = QtCore.pyqtSignal(dict)
    
        def __init__(self, parent=None):
            super().__init__(parent)
    
            self.button = QtWidgets.QPushButton("Press me")
            self.text_edit = QtWidgets.QTextEdit()
    
            central_widget = QtWidgets.QWidget()
            lay = QtWidgets.QVBoxLayout(central_widget)
            lay.addWidget(self.button)
            lay.addWidget(self.text_edit)
            self.setCentralWidget(central_widget)
    
            self.button.clicked.connect(self.on_clicked)
    
        @QtCore.pyqtSlot()
        def on_clicked(self):
            auth_data = {"op": "subscribe", "args": ["instrument:XBTUSD"]}
            self.sendMessageSignal.emit(auth_data)
    
        @QtCore.pyqtSlot(dict)
        def update_data(self, data):
            # only for test
            text = json.dumps(data)
            self.text_edit.setPlainText(text)
    
    
    def main():
        import sys
    
        app = QtWidgets.QApplication(sys.argv)
        loop = QEventLoop(app)
        asyncio.set_event_loop(loop)
    
        uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
        ws = WS(uri)
    
        w = MainWindow()
        ws.dataChanged.connect(w.update_data)
        w.sendMessageSignal.connect(ws.send_message)
        w.show()
    
        with loop:
            loop.create_task(ws.connect())
            loop.run_forever()
    
    
    if __name__ == "__main__":
        main()
    

    【讨论】:

    • @antho 你想要的不是微不足道的,我建议你首先了解 Asyncio 和 PyQt 的工作原理,当你熟悉这两种技术时,然后尝试使用这两种技术,如我更新的示例所示。如果您添加更多要求,那么我将不再回复,在这种情况下,您将不得不创建一个新帖子来说明您尝试过的内容
    • @antho 我没有链接,但我的建议已经在我之前的评论中指出了。
    猜你喜欢
    • 1970-01-01
    • 2021-06-12
    • 2018-05-14
    • 1970-01-01
    • 2011-06-04
    • 2014-02-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多