【问题标题】:How to call asyncio functions inside flask application in python如何在 python 中的烧瓶应用程序中调用 asyncio 函数
【发布时间】:2019-12-14 18:36:59
【问题描述】:

我有一个烧瓶应用程序,它接收请求并尝试从给定的 URL 截取屏幕截图,这是使用 asyncio 函数完成的。

我所做的是,

import asyncio
from pyppeteer import launch
from flask import Flask
import base64
from flask import Blueprint, jsonify, request
import jwt
async def main():
    browser = await launch(headless=True)
    page = await browser.newPage()
    await page.goto(target)
    await page.screenshot({'path': '/tmp/screen.png', 'fullPage': True})
    await browser.close()
app = Flask(__name__)
@app.route('/heatMapDbConfigSave', methods=['POST'])
def notify():
    token, target,id = map(
            request.form.get, ('token', 'target','id'))
    asyncio.get_event_loop().run_until_complete(main(target))

if __name__ == '__main__':
    app.run(host='localhost', port=5002, debug=True)

我遇到的问题是,得到错误 RuntimeError: There is no current event loop in thread 'Thread-2'. 。我已经用谷歌搜索并浏览了以前的帖子。没有任何帮助,也没有指出明确的解决方案。

解决这个问题的方法是什么?

提前致谢!

【问题讨论】:

    标签: python python-3.x flask python-multithreading


    【解决方案1】:

    你可以试试下面的方法

    from flask import Blueprint
    import asyncio
    
    health_check = Blueprint('health_check', __name__)
    
    
    async def first():
        await asyncio.sleep(20)
        return 'first'
    
    async def second():
        await asyncio.sleep(10)
        return 'second'
    
    async def third():
        await asyncio.sleep(10)
        return 'third'
    
    def ordinary_generator():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        for future in asyncio.as_completed([first(), second(), third()]):
            print('reached')
            yield loop.run_until_complete(future)
    
    @health_check.route('', methods=['GET'])
    def healthcheck():
        """
        Retrieves the health of the service.
        """
        for element in ordinary_generator():
            print(element)
        return "Health check passed"
    

    蓝图不是必需的,只是使用它们。您必须在主应用程序文件中注册蓝图,如下所示

        app = Flask(__name__)
        app.register_blueprint(health_check, url_prefix='/api/v1/healthcheck')
    
        if __name__ == '__main__':
            app.run()
    

    【讨论】:

      猜你喜欢
      • 2018-01-22
      • 1970-01-01
      • 2019-11-13
      • 1970-01-01
      • 1970-01-01
      • 2021-11-16
      • 1970-01-01
      • 2018-02-22
      • 1970-01-01
      相关资源
      最近更新 更多