【问题标题】:How to reuse aiohttp ClientSession pool?如何重用 aiohttp ClientSession 池?
【发布时间】:2017-10-28 15:49:16
【问题描述】:

文档说要重用 ClientSession:

不要为每个请求创建会话。您很可能需要一个会话 完全执行所有请求的应用程序。

一个会话内部包含一个连接池,连接重用和 keep-alives(两者都默认开启)可以提高整体性能。1

但是文档中似乎没有关于如何执行此操作的任何解释?有一个可能相关的示例,但它没有说明如何在其他地方重用池:http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-sharing

这样的做法是否正确?

@app.listener('before_server_start')
async def before_server_start(app, loop):
    app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
    app.http_session_pool = aiohttp.ClientSession()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    app.http_session_pool.close()
    app.pg_pool.close()


@app.post("/api/register")
async def register(request):
    # json validation
    async with app.pg_pool.acquire() as pg:
        await pg.execute()  # create unactivated user in db
        async with app.http_session_pool as session:
            # TODO send activation email using SES API
            async with session.post('http://httpbin.org/post', data=b'data') as resp:
                print(resp.status)
                print(await resp.text())
        return HTTPResponse(status=204)

【问题讨论】:

    标签: python-asyncio aiohttp sanic


    【解决方案1】:

    我认为可以改进的地方很少:

    1)

    ClientSession 的实例是一个会话对象。这个会话包含连接池,但它本身不是“session_pool”。我建议将 http_session_pool 重命名为 http_session 或者可能是 client_session

    2)

    会话的close() 方法is a corountine。你应该等待它:

    await app.client_session.close()
    

    甚至更好(恕我直言),而不是考虑如何正确打开/关闭会话使用标准异步上下文管理器等待__aenter__ / __aexit__

    @app.listener('before_server_start')
    async def before_server_start(app, loop):
        # ...
        app.client_session = await aiohttp.ClientSession().__aenter__()
    
    
    @app.listener('after_server_stop')
    async def after_server_stop(app, loop):
        await app.client_session.__aexit__(None, None, None)
        # ...
    

    3)

    关注this info

    但是,如果事件循环在底层连接之前停止 关闭,发出ResourceWarning: unclosed transport 警告 (启用警告时)。

    为避免这种情况,必须在关闭前添加一个小延迟 允许关闭任何打开的底层连接的事件循环。

    我不确定在您的情况下这是强制性的,但在 after_server_stop 中添加 await asyncio.sleep(0) 作为文档建议并没有什么不好:

    @app.listener('after_server_stop')
    async def after_server_stop(app, loop):
        # ...
        await asyncio.sleep(0)  # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown
    

    更新:

    实现__aenter__/__aexit__的类可以用作async context manager(可以在async with语句中使用)。它允许在执行内部块之前和之后执行一些操作。这与常规上下文管理器非常相似,但与asyncio 相关。与常规上下文管理器异步相同,可以直接使用(无需async with)手动等待__aenter__ / __aexit__

    为什么我认为使用 __aenter__ / __aexit__ 手动创建/释放会话比使用 close() 更好?因为我们不应该担心__aenter__ / __aexit__ 内部实际发生的事情。想象一下,在aiohttp 的未来版本中,会话的创建将被更改,例如需要等待open()。如果您使用__aenter__ / __aexit__,则无需更改代码。

    【讨论】:

    • 只是添加到 3),它说如果使用 SSL(即 https?)则使用 0.25s。另外,我不想问,但我没有在其他地方找到__aenter__ / __aexit__ 的任何解释......你能解释一下它有什么不同吗? (看起来它们只是带有 @asyncio.coroutine 装饰器的包装器?)
    • @dtgq 我更新了答案,添加了一些关于__aenter__ / __aexit__ 的信息。什么是 0.25 秒 - 是的,如果您认为您可以请求 https 网址,这是有道理的。
    【解决方案2】:

    在我的代码触发此警告消息后,我在 Google 上搜索如何重用 aiohttp ClientSession 实例后发现了这个问题:UserWarning: Creating a client session outside of coroutine is a very dangerous idea

    此代码虽然相关,但可能无法解决上述问题。我是 asyncio 和 aiohttp 的新手,所以这可能不是最佳实践。这是我在阅读了很多看似矛盾的信息后能想到的最好的方法。

    我从打开上下文的 Python 文档中创建了一个类 ResourceManager。

    ResourceManager 实例通过魔术方法 __aenter____aexit__ 以及 BaseScraper.set_session 和 BaseScraper.close_session 包装方法来处理 aiohttp ClientSession 实例的打开和关闭。

    我能够使用以下代码重用 ClientSession 实例。

    BaseScraper 类也有验证方法。依赖lxml第三方包。

    import asyncio
    from time import time
    from contextlib import contextmanager, AbstractContextManager, ExitStack
    
    import aiohttp
    import lxml.html
    
    
    class ResourceManager(AbstractContextManager):
        # Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html
    
        def __init__(self, scraper, check_resource_ok=None):
            self.acquire_resource = scraper.acquire_resource
            self.release_resource = scraper.release_resource
            if check_resource_ok is None:
    
                def check_resource_ok(resource):
                    return True
    
            self.check_resource_ok = check_resource_ok
    
        @contextmanager
        def _cleanup_on_error(self):
            with ExitStack() as stack:
                stack.push(self)
                yield
                # The validation check passed and didn't raise an exception
                # Accordingly, we want to keep the resource, and pass it
                # back to our caller
                stack.pop_all()
    
        def __enter__(self):
            resource = self.acquire_resource()
            with self._cleanup_on_error():
                if not self.check_resource_ok(resource):
                    msg = "Failed validation for {!r}"
                    raise RuntimeError(msg.format(resource))
            return resource
    
        def __exit__(self, *exc_details):
            # We don't need to duplicate any of our resource release logic
            self.release_resource()
    
    
    class BaseScraper:
        login_url = ""
        login_data = dict()  # dict of key, value pairs to fill the login form
        loop = asyncio.get_event_loop()
    
        def __init__(self, urls):
            self.urls = urls
            self.acquire_resource = self.set_session
            self.release_resource = self.close_session
    
        async def _set_session(self):
            self.session = await aiohttp.ClientSession().__aenter__()
    
        def set_session(self):
            set_session_attr = self.loop.create_task(self._set_session())
            self.loop.run_until_complete(set_session_attr)
            return self  # variable after "as" becomes instance of BaseScraper
    
        async def _close_session(self):
            await self.session.__aexit__(None, None, None)
    
        def close_session(self):
            close_session = self.loop.create_task(self._close_session())
            self.loop.run_until_complete(close_session)
    
        def __call__(self):
            fetch_urls = self.loop.create_task(self._fetch())
            return self.loop.run_until_complete(fetch_urls)
    
        async def _get(self, url):
            async with self.session.get(url) as response:
                result = await response.read()
            return url, result
    
        async def _fetch(self):
            tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
            start = time()
            results = await asyncio.gather(*tasks)
            print(
                "time elapsed: {} seconds \nurls count: {}".format(
                    time() - start, len(urls)
                )
            )
            return results
    
        @property
        def form(self):
            """Create and return form for authentication."""
            form = aiohttp.FormData(self.login_data)
            get_login_page = self.loop.create_task(self._get(self.login_url))
            url, login_page = self.loop.run_until_complete(get_login_page)
    
            login_html = lxml.html.fromstring(login_page)
            hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
            login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
            for key, value in login_form.items():
                form.add_field(key, value)
            return form
    
        async def _login(self, form):
            async with self.session.post(self.login_url, data=form) as response:
                if response.status != 200:
                    response.raise_for_status()
                print("logged into {}".format(url))
                await response.release()
    
        def login(self):
            post_login_form = self.loop.create_task(self._login(self.form))
            self.loop.run_until_complete(post_login_form)
    
    
    if __name__ == "__main__":
        urls = ("http://example.com",) * 10
        base_scraper = BaseScraper(urls)
        with ResourceManager(base_scraper) as scraper:
            for url, html in scraper():
                print(url, len(html))
    

    【讨论】:

      【解决方案3】:

      aiohttp 中似乎没有会话池。
      // 只是发布一些官方文档。


      持续会话

      这里是persistent-session官网的使用演示
      https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session

      app.cleanup_ctx.append(persistent_session)
      
      async def persistent_session(app):
         app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
         yield
         await session.close()
      
      async def my_request_handler(request):
         session = request.app['PERSISTENT_SESSION']
         async with session.get("http://python.org") as resp:
             print(resp.status)
      

      //TODO:完整的可运行演示代码

      连接池

      它有一个连接池:
      https://docs.aiohttp.org/en/latest/client_advanced.html#connectors

      conn = aiohttp.TCPConnector()
      #conn = aiohttp.TCPConnector(limit=30)
      #conn = aiohttp.TCPConnector(limit=0)  # nolimit, default is 100.
      #conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0
      
      session = aiohttp.ClientSession(connector=conn)
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-02-05
        • 2022-12-15
        相关资源
        最近更新 更多