【问题标题】:Urwid, autobahn and twisted eventloop integrationUrwid、autobahn 和扭曲的事件循环集成
【发布时间】:2014-08-19 05:50:32
【问题描述】:

我正在使用高速公路连接到服务器并获取"push" 通知,并且我想使用他们的扭曲事件循环制作一个简单的 urwid 界面。但是我不确定从我的高速公路处理程序类中设置 urwid 文本的最佳方法是什么。在下面的代码中,您可以看到我想从"MyFrontendComponent" 类内部调用"updateText" 方法的当前实现。这样做的最佳方法是什么?

import urwid
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import clientFromString

from autobahn.twisted import wamp, websocket
from autobahn.wamp import types
from autobahn.wamp.serializer import *


class MyFrontendComponent( wamp.ApplicationSession, object):
    @inlineCallbacks
    def onJoin(self, details):
        ## call a remote procedure
        try:
            now = yield self.call(u'com.timeservice.now')
        except Exception as e:
            print("Error: {}".format(e))
        else:
            print("Current time from time service: {}".format(now))

        ## subscribe to a topic
        self.received = 0

        def on_event(i):
            print("Got event: {}".format(i))
            self.received += 1
            if self.received > 5:
                self.leave()

        sub = yield self.subscribe(on_event, u'com.myapp.topic1')
        print("Subscribed with subscription ID {}".format(sub.id))

    def onDisconnect(self):
        reactor.stop()

class MyApp(object):
    txt = urwid.Text(u"Hello World")
    def __init__(self):
        component_config = types.ComponentConfig(realm="realm1")
        session_factory = wamp.ApplicationSessionFactory(config=component_config)
        session_factory.session = MyFrontendComponent
        serializers = None
        serializers = []
        serializers.append(JsonSerializer())
        transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                                                             serializers=serializers, debug=False, debug_wamp=False)
        client = clientFromString(reactor, "tcp:127.0.0.1:8080")
        client.connect(transport_factory)
        fill = urwid.Filler(self.txt, 'top')
        loop = urwid.MainLoop(fill, unhandled_input=self.show_or_exit, event_loop=urwid.TwistedEventLoop())
        loop.run()

    def updateText(self, input):
        self.txt.set_text(input)

    def show_or_exit(self, key):
        if key in ('q', 'Q'):
            raise urwid.ExitMainLoop()
        self.txt.set_text(repr(key))

if __name__ == '__main__':
    MyApp()

以及服务器代码:

import sys
import six
import datetime
from twisted.python import log
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.endpoints import serverFromString
from autobahn.wamp import types
from autobahn.twisted.util import sleep
from autobahn.twisted import wamp, websocket

class MyBackendComponent(wamp.ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        ## register a procedure for remote calling
        def utcnow():
            print("Someone is calling me;)")
            now = datetime.datetime.utcnow()
            return six.u(now.strftime("%Y-%m-%dT%H:%M:%SZ"))

        reg = yield self.register(utcnow, u'com.timeservice.now')
        print("Registered procedure with ID {}".format(reg.id))

        ## publish events to a topic
        counter = 0
        while True:
            self.publish(u'com.myapp.topic1', counter)
            print("Published event.")
            counter += 1
            yield sleep(1)

if __name__ == '__main__':
    ## 0) start logging to console
    log.startLogging(sys.stdout)
    ## 1) create a WAMP router factory
    router_factory = wamp.RouterFactory()
    ## 2) create a WAMP router session factory
    session_factory = wamp.RouterSessionFactory(router_factory)
    ## 3) Optionally, add embedded WAMP application sessions to the router
    component_config = types.ComponentConfig(realm="realm1")
    component_session = MyBackendComponent(component_config)
    session_factory.add(component_session)
    ## 4) create a WAMP-over-WebSocket transport server factory
    transport_factory = websocket.WampWebSocketServerFactory(session_factory,
                                                             debug=False,
                                                             debug_wamp=False)
    ## 5) start the server from a Twisted endpoint
    server = serverFromString(reactor, "tcp:8080")
    server.listen(transport_factory)
    ## 6) now enter the Twisted reactor loop
    reactor.run()

谢谢!

【问题讨论】:

  • 您可以将extra 参数传递给ComponentConfig。稍后,在您的ApplicationSession 中,您可以使用self.config.extra 访问它。您可以使用此转发对MyApp 的引用。 autobahn.ws/python/reference/…

标签: python twisted autobahn urwid


【解决方案1】:

由于我没有足够的代表在评论中回复我会在这里做! oberstet 在评论中给出的答案提示帮助我在没有任何全局变量的情况下正确地做到了这一点! :)

谢谢

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-12-01
    • 2020-10-19
    • 1970-01-01
    • 2012-10-16
    • 2012-09-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多