【问题标题】:Running several ApplicationSessions non-blockingly using autbahn.asyncio.wamp使用 autbahn.asyncio.wamp 以非阻塞方式运行多个 ApplicationSession
【发布时间】:2017-03-01 15:23:48
【问题描述】:

我正在尝试同时在 python 中运行两个autobahn.asyncio.wamp.ApplicationSessions。以前,我使用this post's answer 中建议的修改高速公路库来做到这一点。我现在 需要更专业的解决方案。

在谷歌上搜索了一段时间后,this post appeared quite promising,但使用 twisted 库,而不是 asyncio。我无法为autobahn 库的asyncio 分支找到类似的解决方案,因为它似乎没有使用Reactors

我遇到的主要问题是 ApplicationRunner.run() 阻塞(这就是我之前将它外包给线程的原因),所以我不能在它之后再运行第二个 ApplicationRunner

我确实需要同时访问 2 个 websocket 通道,而我似乎无法使用单个 ApplicationSession

到目前为止我的代码:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

我对@9​​87654338@ 库的了解有限,恐怕文档并没有太大改善我的情况。我在这里忽略了什么吗?一个函数、一个参数,可以让我组合我的组件或同时运行它们?

也许是与provided here 类似的解决方案,它实现了替代ApplicationRunner


相关主题

Running two ApplicationSessions in twisted

Running Autobahn ApplicationRunner in Thread

Autobahn.wamp.ApplicationSession Source

Autobahn.wamp.Applicationrunner Source


根据要求,来自@stovfl 的回复使用multithreading 代码:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/threading.py", line     914, in _bootstrap_inner
    self.run()
  File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
    self.appRunner.run(self.__ApplicationSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143,     in run
    transport_factory = WampWebSocketClientFactory(create,         url=self.url,                 serializers=self.serializers)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     319, in __init__
    WebSocketClientFactory.__init__(self, *args, **kwargs)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     268, in __init__
    self.loop = loop or asyncio.get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 626, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 572, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.

【问题讨论】:

    标签: python python-3.x python-asyncio autobahn wamp-protocol


    【解决方案1】:

    正如我从traceback 看到的,我们只达到第 2 步,共 4 步

    来自 asyncio 文档:
    该模块提供了使用协程编写单线程并发代码的基础设施,通过套接字和其他资源多路复用 I/O 访问

    所以我使用multithreading 放弃了我的第一个提案。
    我可以想象以下三个选项:

    1. multiprocessing 代替multithreading
    2. asyncio loop 内使用coroutine 进行操作
    3. channelsdef onJoin(self, details) 之间切换

    第二个提案,第一个选项使用multiprocessing
    我可以启动两个asyncio loops,所以appRunner.run(...) 应该可以工作。

    如果channel 是唯一不同的,您可以使用一个 class ApplicationSession。 如果您需要传递不同的class ApplicationSession,请将其添加到args=

    class __ApplicationSession(ApplicationSession):
            # ...
            try:
                yield from self.subscribe(onTicker, self.config.extra['channel'])
            except Exception as e:
                # ...
    
    import multiprocessing as mp
    import time
    
    def ApplicationRunner_process(realm, channel):
            appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
            appRunner.run(__ApplicationSession)
    
    if __name__ == "__main__":
        AppRun = [{'process':None, 'channel':'BTC_LTC'},
                  {'process': None, 'channel': 'BTC_XMR'}]
    
        for app in AppRun:
            app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
            app['process'].start()
            time.sleep(0.1)
    
        AppRun[0]['process'].join()
        AppRun[1]['process'].join()
    

    【讨论】:

    • 嗯,你是对的——那句话没有多大意义。 run() 方法确实 阻塞。我指的是覆盖信号调用并在线程中启动单独事件循环的 hacky 解决方案。可能在编辑时迷路了。
    • 不幸的是,由于线程中没有当前的事件循环,这仍然引发了RuntimeError
    • 我们非常接近 - 代码运行,但当我尝试挂接到第二个频道时似乎出现错误消息。 2017-03-16T08:07:33 wamp.error.not_authorized: internal error: realm auto-activation not yet implemented 是对此的回应。这是代码问题还是 API 问题?谷歌搜索没有产生有意义的结果。
    • 更新了我的答案以涵盖这一点。请评论您使用的是 one class ApplicationSession 还是 不同
    • ApplicationSession 的一个类定义。使用 extras 属性效果很好。
    【解决方案2】:

    按照you linked for twisted 的方法,我设法通过异步设置获得了相同的行为start_loop=False

    import asyncio
    from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
    
    runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
    coro1 = runner1.run(MyApplicationSession, start_loop=False)
    
    runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
    coro2 = runner2.run(MyApplicationSession, start_loop=False)
    
    asyncio.get_event_loop().run_until_complete(coro1)
    asyncio.get_event_loop().run_until_complete(coro2)
    asyncio.get_event_loop().run_forever()
    
    class MyApplicationSession(ApplicationSession):
    
        def __init__(self, cfg):
            super().__init__(cfg)
            self.cli_id = cfg.extra['cli_id']
    
       def onJoin(self, details):
            print("session attached", self.cli_id)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-10-29
      • 2015-03-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多