【发布时间】:2017-05-10 20:13:55
【问题描述】:
所以在应用程序中我必须测试 RPC 调用(有一个使用 Autobahn Python 制作的网络服务器)。我想对这些进行集成测试。所以我正在使用Pytest。我这样做的方式是这样的: 1)我有一个这样的测试类:
@pytest.mark.incremental
class TestCreateUser:
@pytest.fixture(scope="function", params="Json Test data")
def create_user_data(self, request):
return request.param
def test_valid_user_creation(self, create_user_data):
ws_realm = config["websocket"]["realm"]
runner = ApplicationRunner(
url="%s://%s:%s/ws" % (
config["websocket"]["protocol"], config["websocket"]["host"], config["websocket"]["port"]),
realm=ws_realm,
extra={"method": u"appname.create_user", "email": create_user_data["username"], create_user_data["password"]}
)
runner.run(MyComponent)
然后我有一个名为“MyComponent”的组件:
def __init__(self, config=None):
super().__init__(config)
@asyncio.coroutine
def onJoin(self, details):
try:
result = yield from self.call(self.config.extra["method"], self.config.extra["email"], self.config.extra["password"])
except Exception as e:
print("Error: {}".format(e))
else:
print(result)
self.leave()
def onDisconnect(self):
asyncio.get_event_loop().stop()
所以我有一些问题,首先我无法在测试类中获得 RPC 调用的结果,其次当我多次运行“test_valid_user_creation”(使用夹具系统)时,我遇到了这样的错误:
./../.pyenv/versions/novalibrary/lib/python3.4/site-packages/autobahn/asyncio/wamp.py:167: in run
(transport, protocol) = loop.run_until_complete(coro)
../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:293: in run_until_complete
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:265: RuntimeError
所以我的问题是我如何检索 RPC 调用的值,以及当使用夹具多次调用测试时,我如何无法获得“事件循环已关闭”。
谢谢。
【问题讨论】:
-
您是否检查过您的
def onDisconnect(self):在每次运行时都被调用了?您必须关闭loop并等到终止,然后再开始新的。关于您的result,将其设为member或class MyComponent或复制到global var。 -
onDisconnect() 每次都会被调用;但我如何等待关闭循环。因为当我调用 asyncio.get_event_loop().close() 时,我遇到了异常。当我调用 asyncio.get_event_loop().stop() 然后调用 asyncio.get_event_loop().is_running() 它返回 true。
-
我的错,我错了。我的意思是
loop.stop()然后 wait 然后loop.close()。阅读有关 wait 的答案:stackoverflow.com/a/43810272/7414759
标签: python rpc pytest autobahn