【问题标题】:How to Subscribe to multiple Websocket streams using Muiltiprocessing如何使用多处理订阅多个 Websocket 流
【发布时间】:2021-06-07 05:24:09
【问题描述】:

我不熟悉在 python 中处理多处理、多线程等。

我正在尝试使用 multiprocessing 从我的加密交换 (API Docs Here) 订阅多个 Websocket 流。 但是,当我运行下面的代码时,我只收到ticker information,而不是order book updates

如何修复代码以获取这两个信息?
multiprocessing 上运行时似乎只有一个 websocket 工作的原因是什么?

(当我分别运行ws_orderBookUpdates()ws_tickerInfo() 函数时,不使用multiprocessing,它单独运行良好,因此不是交易所的问题。)

import websocket
import json
import pprint
from datetime import datetime
import time

# Function to subscribe to ticker information.
def ws_tickerInfo():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_ticker_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message, prev=None):
        print(f"Ticker Info, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)

    ws.run_forever()


# Function to subscribe to order book updates.
def ws_orderBookUpdates():
    def on_open(self):
        print("opened")
        subscribe_message = {
            "method": "subscribe",
            "params": {'channel': "lightning_board_BTC_JPY"}
        }
        ws.send(json.dumps(subscribe_message))

    def on_message(self, message):
        print(f"Order Book, Received : {datetime.now()}")

        ###### full json payloads ######
        # pprint.pprint(json.loads(message))

    def on_close(self):
        print("closed connection")

    endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    ws = websocket.WebSocketApp(endpoint,
                                on_open=on_open,
                                on_message=on_message,
                                on_close=on_close)
    ws.run_forever()


# Multiprocessing two functions
if __name__ == '__main__':
    import multiprocessing as mp

    mp.Process(target=ws_tickerInfo(), daemon=True).start()
    mp.Process(target=ws_orderBookUpdates(), daemon=True).start()

【问题讨论】:

    标签: python websocket multiprocessing algorithmic-trading cryptoapi


    【解决方案1】:

    更新

    您已经创建了两个 daemon 进程。当所有非守护进程进程终止时,它们将终止,在这种情况下,主进程是在创建守护进程后立即终止。您很幸运,即使其中一个过程也有机会产生输出,但为什么要冒险呢? 不要使用 dameon 进程。而是:

    if __name__ == '__main__':
        import multiprocessing as mp
    
        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
        p1.start()
        p2.start()
        p1.join() # wait for completion
        p2.join() # wait for completion
    

    但真正的问题是盯着我们的脸,我们都错过了!你有:

        p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
        p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
    

    应该是什么时候:

        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
    

    看到区别了吗?实际上,您并没有将函数 ws_tickerInfo 传递给 Process,而是 调用 ws_tickerInfo 并尝试传递返回值,如果函数曾经返回,这将是荒谬的 None(它没有)。因此,您甚至都无法执行第二个流程创建语句。

    您可能也为此使用了多线程而不是多处理,尽管 Ctrl-C 中断处理程序可能不起作用(见下文)。还应该有一种终止程序的机制。我添加了一些代码来检测 Ctrl-C 并在输入时终止。此外,您使用self 作为函数参数,就好像该函数实际上是一个类方法,但事实并非如此。这不是好的编程风格。这是更新的来源:

    import websocket
    import json
    import pprint
    from datetime import datetime
    import time
    import sys
    import signal
    
    # Function to subscribe to ticker information.
    def ws_tickerInfo():
        def on_open(wsapp):
            print("opened")
            subscribe_message = {
                "method": "subscribe",
                "params": {'channel': "lightning_ticker_BTC_JPY"}
            }
            wsapp.send(json.dumps(subscribe_message))
    
        def on_message(wsapp, message, prev=None):
            print(f"Ticker Info, Received : {datetime.now()}")
    
            ###### full json payloads ######
            # pprint.pprint(json.loads(message))
    
        def on_close(wsapp):
            print("closed connection")
    
        endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        ws = websocket.WebSocketApp(endpoint,
                                    on_open=on_open,
                                    on_message=on_message,
                                    on_close=on_close)
    
        ws.run_forever()
    
    
    # Function to subscribe to order book updates.
    def ws_orderBookUpdates():
        def on_open(wsapp):
            print("opened")
            subscribe_message = {
                "method": "subscribe",
                "params": {'channel': "lightning_board_BTC_JPY"}
            }
            wsapp.send(json.dumps(subscribe_message))
    
        def on_message(wsapp, message):
            print(f"Order Book, Received : {datetime.now()}")
    
            ###### full json payloads ######
            # pprint.pprint(json.loads(message))
    
        def on_close(wsapp):
            print("closed connection")
    
        endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
        ws = websocket.WebSocketApp(endpoint,
                                    on_open=on_open,
                                    on_message=on_message,
                                    on_close=on_close)
        ws.run_forever()
    
    def handle_ctrl_c(signum, stack_frame):
        sys.exit(0)
    
    if __name__ == '__main__':
        import multiprocessing as mp
    
        signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
        print('Enter Ctrl-C to terminate.')
        p1 = mp.Process(target=ws_tickerInfo)
        p2 = mp.Process(target=ws_orderBookUpdates)
        p1.start()
        p2.start()
        p1.join() # wait for completion (will never happen)
        p2.join() # wait for completion (will never happen)
    

    使用多线程

    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
        t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
        t1.start()
        t2.start()
        input('Hit enter to terminate...\n')
    

    【讨论】:

    • 感谢您的评论,我修复了代码,但我仍然只能得到 ws_tickerInfo() 流媒体,所以可能不是因为 deamon=True
    • 我想通了——我应该早点看到的。
    • 您的回答正是我所需要的!也感谢您联系multithreading。您如何区分何时使用multiprocessingmultithreading?我理解的方式是我的 Websocket 示例不需要太多的计算能力(它只是接收数据)所以我宁愿使用multithreading。如果还有什么我应该注意的,非常感谢进一步的建议。
    • 没错。由于ws_tickerInfows_orderBookUpdates 大多处于等待状态,等待数据到达,当它们最终有数据时,它们使用最少的 CPU 资源来处理它,它们不会长时间保持全局解释器锁,因此给彼此奔跑的机会。对答案表示赞赏(假设是您),但接受答案对所有相关人员来说都更好。
    猜你喜欢
    • 2019-03-25
    • 1970-01-01
    • 2021-11-19
    • 1970-01-01
    • 2016-06-18
    • 2019-06-16
    • 2016-04-27
    • 2021-07-29
    • 2021-08-03
    相关资源
    最近更新 更多