【问题标题】:how to reconnect after autobahn websocket timeout?高速公路 websocket 超时后如何重新连接?
【发布时间】:2017-02-16 09:49:18
【问题描述】:

我正在使用高速公路连接到这样的 websocket。

class MyComponent(ApplicationSession):

  @inlineCallbacks
  def onJoin(self, details):
    print("session ready")

    def oncounter(*args, **args2):
        print("event received: args: {} args2: {}".format(args, args2))

    try:
        yield self.subscribe(oncounter, u'topic')
        print("subscribed to topic")
    except Exception as e:
        print("could not subscribe to topic: {0}".format(e))

if __name__ == '__main__':
  addr = u"wss://mywebsocketaddress.com"
  runner = ApplicationRunner(url=addr, realm=u"realm1", debug=False, debug_app=False)
  runner.run(MyComponent)

这很好用,我可以接收消息。但是,大约 3-4 小时后,有时会更快,消息突然停止发送。似乎 websocket 超时(会发生这种情况吗?),可能是由于连接问题。

发生这种情况时如何自动重新连接高速公路?


这是我的尝试,但从未调用重新连接代码。

class MyClientFactory(ReconnectingClientFactory, WampWebSocketClientFactory):

    maxDelay = 10
    maxRetries = 5

    def startedConnecting(self, connector):
        print('Started to connect.')

    def clientConnectionLost(self, connector, reason):
        print('Lost connection. Reason: {}'.format(reason))
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason: {}'.format(reason))
        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)

class MyApplicationRunner(object):

    log = txaio.make_logger()

    def __init__(self, url, realm, extra=None, serializers=None,
                 debug=False, debug_app=False,
                 ssl=None, proxy=None):

        assert(type(url) == six.text_type)
        assert(realm is None or type(realm) == six.text_type)
        assert(extra is None or type(extra) == dict)
        assert(proxy is None or type(proxy) == dict)
        self.url = url
        self.realm = realm
        self.extra = extra or dict()
        self.serializers = serializers
        self.debug = debug
        self.debug_app = debug_app
        self.ssl = ssl
        self.proxy = proxy

    def run(self, make, start_reactor=True):
        if start_reactor:
            # only select framework, set loop and start logging when we are asked
            # start the reactor - otherwise we are running in a program that likely
            # already tool care of all this.
            from twisted.internet import reactor
            txaio.use_twisted()
            txaio.config.loop = reactor

            if self.debug or self.debug_app:
                txaio.start_logging(level='debug')
            else:
                txaio.start_logging(level='info')

        isSecure, host, port, resource, path, params = parseWsUrl(self.url)

        # factory for use ApplicationSession
        def create():
            cfg = ComponentConfig(self.realm, self.extra)
            try:
                session = make(cfg)
            except Exception as e:
                if start_reactor:
                    # the app component could not be created .. fatal
                    self.log.error(str(e))
                    reactor.stop()
                else:
                    # if we didn't start the reactor, it's up to the
                    # caller to deal with errors
                    raise
            else:
                session.debug_app = self.debug_app
                return session

        # create a WAMP-over-WebSocket transport client factory
        transport_factory = MyClientFactory(create, url=self.url, serializers=self.serializers,
                                                       proxy=self.proxy, debug=self.debug)

        # supress pointless log noise like
        # "Starting factory <autobahn.twisted.websocket.WampWebSocketClientFactory object at 0x2b737b480e10>""
        transport_factory.noisy = False

        # if user passed ssl= but isn't using isSecure, we'll never
        # use the ssl argument which makes no sense.
        context_factory = None
        if self.ssl is not None:
            if not isSecure:
                raise RuntimeError(
                    'ssl= argument value passed to %s conflicts with the "ws:" '
                    'prefix of the url argument. Did you mean to use "wss:"?' %
                    self.__class__.__name__)
            context_factory = self.ssl
        elif isSecure:
            from twisted.internet.ssl import optionsForClientTLS
            context_factory = optionsForClientTLS(host)

        from twisted.internet import reactor
        if self.proxy is not None:
            from twisted.internet.endpoints import TCP4ClientEndpoint
            client = TCP4ClientEndpoint(reactor, self.proxy['host'], self.proxy['port'])
            transport_factory.contextFactory = context_factory
        elif isSecure:
            from twisted.internet.endpoints import SSL4ClientEndpoint
            assert context_factory is not None
            client = SSL4ClientEndpoint(reactor, host, port, context_factory)
        else:
            from twisted.internet.endpoints import TCP4ClientEndpoint
            client = TCP4ClientEndpoint(reactor, host, port)

        d = client.connect(transport_factory)

        # as the reactor shuts down, we wish to wait until we've sent
        # out our "Goodbye" message; leave() returns a Deferred that
        # fires when the transport gets to STATE_CLOSED
        def cleanup(proto):
            if hasattr(proto, '_session') and proto._session is not None:
                if proto._session.is_attached():
                    return proto._session.leave()
                elif proto._session.is_connected():
                    return proto._session.disconnect()

        # when our proto was created and connected, make sure it's cleaned
        # up properly later on when the reactor shuts down for whatever reason
        def init_proto(proto):
            reactor.addSystemEventTrigger('before', 'shutdown', cleanup, proto)
            return proto

        # if we connect successfully, the arg is a WampWebSocketClientProtocol
        d.addCallback(init_proto)

        # if the user didn't ask us to start the reactor, then they
        # get to deal with any connect errors themselves.
        if start_reactor:
            # if an error happens in the connect(), we save the underlying
            # exception so that after the event-loop exits we can re-raise
            # it to the caller.

            class ErrorCollector(object):
                exception = None

                def __call__(self, failure):
                    self.exception = failure.value
                    reactor.stop()
            connect_error = ErrorCollector()
            d.addErrback(connect_error)

            # now enter the Twisted reactor loop
            reactor.run()

            # if we exited due to a connection error, raise that to the
            # caller
            if connect_error.exception:
                raise connect_error.exception

        else:
            # let the caller handle any errors
            return d

我得到的错误:

2016-10-09T21:00:40+0100 与 tcp4:xxx.xx.xx.xx:xxx 的连接以非干净方式丢失:连接丢失 2016-10-09T21:00:40 + 0100 _connectionLost:[失败实例:回溯(无帧失败)::与另一端的连接以非干净方式丢失:连接 l 原声带 ] 2016-10-09T21:00:40+0100 WAMP-over-WebSocket 传输丢失:wasClean=False,代码=1006,原因=“连接被不干净地关闭(对等方在没有先前 WebSocket 关闭握手的情况下丢弃了 TCP 连接)” 2016-10-09T21:10:39+0100 例外:未收到任何消息 2016-10-09T21:10:39+0100 Traceback(最近一次通话最后):

【问题讨论】:

    标签: python twisted autobahn


    【解决方案1】:

    你试过pip3 install autobahn-autoreconnect吗?

    # from autobahn.asyncio.wamp import ApplicationRunner
    from autobahn_autoreconnect import ApplicationRunner
    

    【讨论】:

      【解决方案2】:

      这是一个自动重新连接的 ApplicationRunner 的example。启用自动重新连接的重要行是:

      runner.run(session, auto_reconnect=True)
      

      您还需要激活自动 WebSocket ping/pong(例如在 Crossbar.io 中),以 a) 最大限度地减少因超时而导致的连接丢失,以及 b) 允许快速检测丢失的连接。

      【讨论】:

        【解决方案3】:

        如果您使用的是 Twisted,则可以使用 ReconnectingClientFactoryThere's a simple example by the autobahn developer on github。不幸的是,似乎没有预先构建具有此功能的ApplicationRunner 的实现,但您自己实现看起来并不难。 Here's an asyncio variant 应该直指 Twisted。您可以关注this issue,因为开发团队似乎希望合并重新连接客户端。

        【讨论】:

        • 我只知道如何使用twisted 禁用SSL 验证,所以我坚持这样做。 (您链接了我的(bizso09)Github comment here。我已经编辑了问题以尝试实现 ReconnectingClientFactory,但从未调用过重新连接代码。您能看一下吗?非常感谢
        • 我注意到的一件事是在工厂类中没有调用重置重新连接延迟(即self.resetDelay())。您可能必须这样做以提示 Twisted 尽快尝试重新连接,而不是间隔等待。我不确定这是否是重新连接没有发生的原因
        • 我明白了。另一个问题是方法clientConnectionLost 从未被调用过。正如您在错误输出日志中看到的那样,打印语句不存在。我在源代码中跟踪打印的错误,它通过调用会话的onClose 方法的WampWebSocketClientProtocol 类。这似乎完全绕过了 ReconnectingFactory?另外,在我的情况下,似乎是服务器强制断开连接,但这不重要吗?
        • 这很奇怪。我刚刚尝试了重新连接代码的示例,它对我来说很好。您的日志说它没有完全断开,这意味着连接突然丢失。也许有防火墙规则来拆除空闲连接?
        • 如果我断开互联网连接,它似乎会重新连接。但是,我仍然在其他时间收到上述错误消息。我认为这可能与服务器重启/部署有关。如果我想在我的Component 类中处理onClose 并重新连接,我应该调用什么?我可以启动另一个ApplicationRunner 实例吗?似乎主要的扭曲循环线程被称为reactor 并且它是全局的,所以不确定两次调用应用程序运行器如何影响它。目前,我的解决方案是在检测到连接断开时使用操作系统级别的命令重新启动脚本,这很 hacky。
        猜你喜欢
        • 2017-06-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-06
        • 1970-01-01
        相关资源
        最近更新 更多