【问题标题】:Unit testing Python Flask Stream单元测试 Python Flask Stream
【发布时间】:2015-09-06 03:35:09
【问题描述】:

有人对测试 Flask 内容流资源有任何经验/建议吗?我的应用程序使用 Redis Pub/Sub,当在频道中接收消息时,它会流式传输文本 'data: {"value":42}'

实现遵循 Flask 文档:docs

我的单元测试也是按照 Flask 文档完成的。发送到 Redis 发布/订阅的消息由第二个资源 (POST) 发送。

我正在创建一个线程来监听流,同时我在主应用程序上发布到发布到 Redis 的资源。

虽然连接断言通过(我收到 OK 200 和 mimetype 'text/event-stream'),但数据对象是空的。

我的单元测试是这样的:

def test_04_receiving_events(self):
    headers = [('Content-Type', 'application/json')]
    data = json.dumps({"value": 42})
    headers.append(('Content-Length', len(data)))

    def get_stream():
        rv_stream = self.app.get('stream/data')
        rv_stream_object = json.loads(rv_stream.data) #error (empty)
        self.assertEqual(rv_stream.status_code, 200)
        self.assertEqual(rv_stream.mimetype, 'text/event-stream')
        self.assertEqual(rv_stream_object, "data: {'value': 42}")
        t.stop()

    threads = []
    t = Thread(target=get_stream)
    threads.append(t)
    t.start()
    time.sleep(1)
    rv_post = self.app.post('/sensor', headers=headers, data=data)

    threads_done = False
    while not threads_done:
        threads_done = True
        for t in threads:
            if t.is_alive():
                threads_done = False
                time.sleep(1)

应用资源是:

@app.route('/stream/data')
def stream():
    def generate():
        pubsub = db.pubsub()
        pubsub.subscribe('interesting')

        for event in pubsub.listen():

            if event['type'] == 'message':
                yield 'data: {"value":%s}\n\n' % event['data']
    return Response(stream_with_context(generate()),
                direct_passthrough=True,
                mimetype='text/event-stream')

关于如何在 Flask 中测试内容流的任何指针或示例?除非我搜索错误的关键字,否则 Google 似乎在这方面没有多大帮助。

提前致谢。

【问题讨论】:

  • 我相信您要做的是集成测试,而不是单元测试。
  • 好点。整合。
  • 也许这会有所帮助。我正在尝试做同样的事情,但我正在为客户认为内容就是数据而我应该模拟数据这一事实而苦苦挣扎......stackoverflow.com/questions/11399148/…

标签: python testing flask stream


【解决方案1】:

在将任何数据发布到 REDIS 之前,您将线程启动到 get_stream()

这意味着它不会找到任何事件,并且会在不流式传输任何数据的情况下返回。

我相信您根本不需要线程,您可以简单地使用 POST 为您的集成测试设置数据,然后调用流。

def test_04_receiving_events(self):
    headers = [('Content-Type', 'application/json')]
    data = json.dumps({"value": 42})
    headers.append(('Content-Length', len(data)))
    rv_post = self.app.post('/sensor', headers=headers, data=data)

    rv_stream = self.app.get('stream/data')
    rv_stream_object = json.loads(rv_stream.data)

    self.assertEqual(rv_stream.status_code, 200)
    self.assertEqual(rv_stream.mimetype, 'text/event-stream')
    self.assertEqual(rv_stream_object, "data: {'value': 42}")

【讨论】:

    【解决方案2】:

    如果您对集成测试感兴趣而不是单元测试,我的建议是使用 Postman 应用程序及其与 newman 的命令行集成。

    您可以将环境中的所有变量和参数以及要在集合中测试的所有请求。 Postman 为此提供了文档和教程,请参阅https://learning.postman.com/。免费版可以做很多事情。

    设置 Authorization、Params、Headers 以匹配来自向您的实例发送请求以进行测试的服务的传入请求。 当您准备好请求以在邮递员中成功运行后,您可以在邮递员中编写测试,您可以在其中断言请求响应正文的特定部分。 Postman 还提供了 pre-build sn-ps 来编写这些。

        pm.test("Body includes 'some string'", function () {
            pm.expect(pm.response.text()).to.include("some string");
        });
    

    您还可以将您的请求集合(包括测试)和环境导出为 json,并使用 newman 在 shell 中运行它。

       $ newman run mycollection.json -e dev_environment.json
    

    如果您想将测试集成到 CICD 管道中,这很有用。

    【讨论】:

      猜你喜欢
      • 2020-07-07
      • 2019-02-25
      • 1970-01-01
      • 1970-01-01
      • 2013-11-26
      • 1970-01-01
      • 1970-01-01
      • 2022-01-11
      • 1970-01-01
      相关资源
      最近更新 更多