【问题标题】:Get JSON using Python and AsyncIO使用 Python 和 AsyncIO 获取 JSON
【发布时间】:2018-11-07 23:06:20
【问题描述】:

不久前,我开始学习 asyncio。我遇到了一个问题。我的代码没有终止。我想不通。请帮帮我!

import signal
import sys
import asyncio
import aiohttp
import json

loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)

async def get_json(client, url):
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_cont(subreddit, client):
    data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')

    jn = json.loads(data1.decode('utf-8'))

    print('DONE:', subreddit)

def signal_handler(signal, frame):
    loop.stop()
    client.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

for key in {'python':1, 'programming':2, 'compsci':3}:
    asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_forever()

结果:

DONE: compsci  
DONE: programming  
DONE: python  
...

我试图完成一些事情,但结果并不稳定。

future = []
for key in {'python':1, 'programming':2, 'compsci':3}:
    future=asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_until_complete(future)

结果(1 个任务而不是 3 个):

DONE: compsci  
[Finished in 1.5s]  

我这样解决了我的问题:

添加者:

async with aiohttp.ClientSession () as a client:

在:

async def get_reddit_cont (subreddit, client):  

还有:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = [get_reddit_cont(subreddit,client) for subreddit in range(1,6)]
    result = loop.run_until_complete(asyncio.gather(*futures))

但是当代码完成后,我得到消息:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x034021F0>
[Finished in 1.0s]

我不明白为什么会这样。

但是当我尝试执行“for key”大约 60 次或更多次时,我得到一个错误:

...
aiohttp.client_exceptions.ClientOSError: [WinError 10054] 远程主机强行终止现有连接

【问题讨论】:

    标签: python python-asyncio


    【解决方案1】:

    这里有一些建议的更改,在 cmets 中有上下文。

    除非您真的有一个独特的用例,或者只是为了学习而进行试验,否则可能没有理由使用 signal -- asyncio 具有让您决定何时使用的顶级函数关闭并终止事件循环。

    import asyncio
    import logging
    import sys
    
    import aiohttp
    
    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
                        format='%(asctime)s:%(message)s')
    
    URL = 'https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=50'
    
    
    async def get_json(client: aiohttp.ClientSession, url: str) -> dict:
        # If you're going to be making repeated requests, use this
        # over .get(), which is just a wrapper around `.request()` and
        # involves an unneeded lookup
        async with client.request('GET', url) as response:
    
            # Raise if the response code is >= 400.
            # Some 200 codes may still be "ok".
            # You can also pass raise_for_status within
            # client.request().
            response.raise_for_status()
    
            # Let your code be fully async.  The call to json.loads()
            # is blocking and won't take full advantage.
            #
            # And it does largely the same thing you're doing now:
            # https://github.com/aio-libs/aiohttp/blob/76268e31630bb8615999ec40984706745f7f82d1/aiohttp/client_reqrep.py#L985
            j = await response.json()
            logging.info('DONE: got %s, size %s', url, j.__sizeof__())
            return j
    
    
    async def get_reddit_cont(keys, **kwargs) -> list:
        async with aiohttp.ClientSession(**kwargs) as session:
            # Use a single session as a context manager.
            # this enables connection pooling, which matters a lot when
            # you're only talking to one site
            tasks = []
            for key in keys:
                # create_task: Python 3.7+
                task = asyncio.create_task(
                    get_json(session, URL.format(subreddit=key)))
                tasks.append(task)
            # The result of this will be a list of dictionaries
            # It will only return when all of your subreddits
            # have given you a response & been decoded
            #
            # To process greedily, use asyncio.as_completed()
            return await asyncio.gather(*tasks, return_exceptions=True)
    
    
    if __name__ == '__main__':
        default = ('python', 'programming', 'compsci')
        keys = sys.argv[1:] if len(sys.argv) > 1 else default
        sys.exit(asyncio.run(get_reddit_cont(keys=keys)))
    

    输出:

    $ python3 asyncreddit.py 
    2018-11-07 21:44:49,495:Using selector: KqueueSelector
    2018-11-07 21:44:49,653:DONE: got https://www.reddit.com/r/compsci/top.json?sort=top&t=day&limit=50, size 216
    2018-11-07 21:44:49,713:DONE: got https://www.reddit.com/r/python/top.json?sort=top&t=day&limit=50, size 216
    2018-11-07 21:44:49,947:DONE: got https://www.reddit.com/r/programming/top.json?sort=top&t=day&limit=50, size 216
    

    编辑:根据您的问题:

    但是当代码完成后,我得到消息:Unclosed client session

    这是因为你需要.close() client 对象,就像你需要一个文件对象一样。你可以通过两种方式做到这一点:

    • 明确调用它:client.close()。将其包装在 try/finally 块中以确保无论如何都关闭它会更安全
    • 或者(更简单的方法),将客户端用作异步上下文管理器,如本答案所示。这意味着,在async with 块结束后,会话将通过其.__aexit__() 方法自动关闭。

    connector 是底层TCPConnector,它是会话的一个属性。它处理连接池,它最终在您的代码中保持打开状态。

    【讨论】:

    • 在这种情况下,如果任何客户端没有响应,您将如何跳过或忽略结果?例如超时或 404 错误?
    【解决方案2】:

    答案在于您的代码。这是线索loop.run_forever()。所以你需要打电话给loop.stop()。我会使用if 子句或while 循环之类的条件。

    if we_have_what_we_need:
        signal_handler(signal, frame)
    

    while we_dont_have_what_we_need:
        loop.forever()
    

    第一个将在满足条件时停止您的代码。后者会一直持续到满足条件为止。

    [更新]

    我们也可以使用;

    (Python 文档)

    loop.run_until_complete(future)
    

    运行直到未来(Future 的一个实例)完成。

    如果参数是一个协程对象,它被隐式调度到 作为 asyncio.Task 运行。

    返回 Future 的结果或引发它的异常。

    loop.run_forever()
    

    运行事件循环,直到调用 stop()。

    【讨论】:

    • 是的,我知道问题在其中,但我不明白如何解决它。 :(
    • 您需要致电loop.stop()。我会使用if 子句或while 循环之类的条件。您可以在 [python 文档] (docs.python.org/3/library/…) 中找到它
    • @idrees 标准习语是run_until_complete(asyncio.gather(coroutine1(), coroutine2(), ...)),一旦所有协程完成,它就会完成,所以你不再需要run_forever。当事先不知道协程的数量时,Brad 的回答显示了一个变体。
    【解决方案3】:

    我是这样解决问题的:

    import asyncio
    import aiohttp
    import json
    
    async def get_json(client, url):
        async with client.get(url) as response:
            assert response.status == 200
            return await response.read()
    
    async def get_reddit_cont(subreddit):
        async with aiohttp.ClientSession(loop=loop) as client:
            data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')
    
            jn = json.loads(data1.decode('utf-8'))
    
            print('DONE:', subreddit)
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        futures = [get_reddit_cont(subreddit) for subreddit in {'python':1, 'programming':2, 'compsci':3}]
        result = loop.run_until_complete(asyncio.gather(*futures))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-02-13
      • 2018-10-22
      • 2021-12-22
      • 2020-11-20
      • 1970-01-01
      • 1970-01-01
      • 2012-09-03
      相关资源
      最近更新 更多