【发布时间】:2015-09-06 03:35:09
【问题描述】:
有人对测试 Flask 内容流资源有任何经验/建议吗?我的应用程序使用 Redis Pub/Sub,当在频道中接收消息时,它会流式传输文本 'data: {"value":42}'
实现遵循 Flask 文档:docs
我的单元测试也是按照 Flask 文档完成的。发送到 Redis 发布/订阅的消息由第二个资源 (POST) 发送。
我正在创建一个线程来监听流,同时我在主应用程序上发布到发布到 Redis 的资源。
虽然连接断言通过(我收到 OK 200 和 mimetype 'text/event-stream'),但数据对象是空的。
我的单元测试是这样的:
def test_04_receiving_events(self):
headers = [('Content-Type', 'application/json')]
data = json.dumps({"value": 42})
headers.append(('Content-Length', len(data)))
def get_stream():
rv_stream = self.app.get('stream/data')
rv_stream_object = json.loads(rv_stream.data) #error (empty)
self.assertEqual(rv_stream.status_code, 200)
self.assertEqual(rv_stream.mimetype, 'text/event-stream')
self.assertEqual(rv_stream_object, "data: {'value': 42}")
t.stop()
threads = []
t = Thread(target=get_stream)
threads.append(t)
t.start()
time.sleep(1)
rv_post = self.app.post('/sensor', headers=headers, data=data)
threads_done = False
while not threads_done:
threads_done = True
for t in threads:
if t.is_alive():
threads_done = False
time.sleep(1)
应用资源是:
@app.route('/stream/data')
def stream():
def generate():
pubsub = db.pubsub()
pubsub.subscribe('interesting')
for event in pubsub.listen():
if event['type'] == 'message':
yield 'data: {"value":%s}\n\n' % event['data']
return Response(stream_with_context(generate()),
direct_passthrough=True,
mimetype='text/event-stream')
关于如何在 Flask 中测试内容流的任何指针或示例?除非我搜索错误的关键字,否则 Google 似乎在这方面没有多大帮助。
提前致谢。
【问题讨论】:
-
我相信您要做的是集成测试,而不是单元测试。
-
好点。整合。
-
也许这会有所帮助。我正在尝试做同样的事情,但我正在为客户认为内容就是数据而我应该模拟数据这一事实而苦苦挣扎......stackoverflow.com/questions/11399148/…
标签: python testing flask stream