【问题标题】:Asynchronous code failure when connecting to redis连接redis时异步代码失败
【发布时间】:2021-07-19 15:44:47
【问题描述】:

我创建了一个小类来执行 redis 的基本操作,使用 aioredis。

class RedisService:
    def __init__(self, r_url) -> str:
        self.redis = r_url

    async def create_connection(self):
        return await aioredis.create_redis(self.redis)

    async def _get(self, key) -> str:
        try:
            return await self.create_connection().get(key, encoding='utf-8')
        finally:
            await self._close()

    async def _set(self, key, value) -> None:
        await self.create_connection().set(key, value)
        await self._close()

    async def _close(self) -> None:
        self.create_connection().close()
        await self._redis.wait_closed() 

还有一个测试处理程序来调用 redis 的写/读操作

@router.post('/perform')
async def index():
    key = 'test'
    value = 'test'
    value = await RedisService(r_url)._set(key, value)
    return {'result': value}

但报错

    await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'

我猜问题可能是异步代码必须通过事件循环运行

asyncio.run(some coroutine)

但我不明白如何将这个逻辑构建到我的代码中

【问题讨论】:

  • 回溯中的代码和您发布的代码不同。根据回溯,您似乎缺少一对括号,您在其中调用 set:await self.create_connection().set(key, value)。根据您发布的代码 sn-p,您在调用 get() 时缺少括号:await self.create_connection().get(key, encoding='utf-8')

标签: python redis async-await python-asyncio aioredis


【解决方案1】:

你的问题是你如何使用create_connection。你必须调用它并等待它返回什么。

await self.create_connection()

然后,您还需要等待 setget。作为单线,这会变得混乱。

await (await self.create_connection()).set(key, value)

为了帮助清理它,您应该将 await 拆分为单独的语句。

conn = await self.create_connection()
await conn.set(key, value)

每次需要执行操作时都创建一个新连接可能会很昂贵。我建议以一种或两种方式更改create_connection

要么让它连接到你的实例

async def create_connection(self):
    self.conn = await aioredis.create_redis(self.redis)

您可以在实例化RedisService 的实例后调用它,然后使用

await self.conn.set(key, value)

或者您可以切换到使用连接池。

async def create_connection(self):
    return await aioredis.create_redis_pool(self.redis)

【讨论】:

    【解决方案2】:

    uvicorn 在启动 fastapi 应用程序时提供的 event_loop 可以处理以异步方式调用 redis 获取/设置值。 参考 -> https://fastapi.tiangolo.com/tutorial/first-steps/

    “https://github.com/tiangolo/fastapi/issues/1694”上提供的以下代码 sn-p 已更改以适合问题中的提问。

    将创建连接结果移动到对象状态中有助于处理网络调用的异步解析。

    当路径“/”被查询对象的状态时,保持连接的状态将用于 以异步方式将结果设置为 redis。

    from fastapi import FastAPI
    from connections import redis_cache
    
    app = FastAPI()
    
    @app.on_event('startup')
    async def startup_event():
        await redis_cache.create_connection(r_url="redis://localhost")
    
    @app.on_event('shutdown')
    async def shutdown_event():
        await redis_cache._close()
    
    @app.get("/")
    async def root():
        key = 'key'
        value = 'data'
        await redis_cache._set(key, value)
    
    @app.get("/value")
    async def get_value():
        key = 'key'
        return await redis_cache._get(key)
    
    
    from typing import Optional
    from aioredis import Redis, create_redis
    
    
    class RedisCache:
        def __init__(self) -> str:
            self.redis_cache = None
    
        async def create_connection(self,r_url):
            self.redis_cache = await create_redis(r_url)
    
        async def _get(self, key) -> str:
            return await self.redis_cache.get(key)
    
        async def _set(self, key, value) -> None:
            await self.redis_cache.set(key, value)
    
        async def _close(self) -> None:
            self.redis_cache.close()
            await self.redis_cache.wait_closed()
    
    redis_cache = RedisCache()
    

    【讨论】:

    • 我很惊讶您从我使用 fastapi 的上下文中理解。这正是我在我的项目中实现的。谢谢
    猜你喜欢
    • 1970-01-01
    • 2020-06-16
    • 1970-01-01
    • 2015-11-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-12
    相关资源
    最近更新 更多