【问题标题】:parallelize calls to an API with hard limit per minute以每分钟硬限制并行调用 API
【发布时间】:2020-08-31 17:07:08
【问题描述】:

我正在尝试对 API 进行并行调用。 API 在停止前限制为每分钟 1,200 次调用。在低于限制的情况下进行异步的最有效方法是什么?

def remove_html_tags(text):
    """Remove html tags from a string"""
    import re
    clean = re.compile('<.*?>')
    return re.sub(clean, ' ', text)

async def getRez(df, url):
async with aiohttp.ClientSession() as session:
        auth = aiohttp.BasicAuth('username',pwd)


        r = await session.get(url, auth=auth)


        if r.status == 200:
            content = await r.text()
            text = remove_html_tags(str(content))

        else:
            text = '500 Server Error'
        df.loc[df['url'] == url, ['RezText']] = [[text]]
        df['wordCount'] = df['RezText'].apply(lambda x: len(str(x).split(" ")))
        data = df[df["RezText"] != "500 Server Error"]


async def main(df):
    df['RezText'] = None
    await asyncio.gather(*[getRez(df, url) for url in df['url']])

loop = asyncio.get_event_loop()
loop.run_until_complete(main(data))

【问题讨论】:

    标签: python async-await aiohttp


    【解决方案1】:
    每分钟

    1200 次调用相当于每秒 20 次调用,因此您可以将您的请求分成 批次 20 个,并在它们之间休眠一秒钟批次。

    另一种选择是将aiohttp.TCPConnector(limit=20) 用于客户端会话,但这只会限制并发请求的数量,因此您最终可能会执行更多请求(如果 API 响应速度快于一个秒)或更少的请求(如果 API 响应慢于一秒);请参阅this 相关问题。

    批处理示例:

    # python 3.7+
    import aiohttp
    import asyncio
    
    async def fetch(session, url):
        data = None
        async with session.get(url) as response:
            if response.status != 200:
                text = await response.text()
                print("cannot retrieve %s: status: %d, reason: %s" % (url, response.status, text))
            else :
                data = await response.json()
        return data
    
    async def main(n):
        print("starting")
        session = aiohttp.ClientSession()
        tasks = []
        batch = []
        for i in range(n):
            batch.append("http://httpbin.org/anything?key=a%d" % i)
            if len(batch) >= 20:
                print("issuing batch %d:%d" % (i-20+1, i+1))
                for url in batch:
                    task = asyncio.create_task(fetch(session, url))
                    tasks.append(task)
                batch = []
                await asyncio.sleep(1)
        if batch:  # if batch length does not divide n evenly consume last batch
            print("issuing last batch %d:%d" % (n-len(batch), n))
            for url in batch:
                task = asyncio.create_task(fetch(session, url))
                tasks.append(fetch(session, url))
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        await session.close()
        for response in responses:
            assert "args" in response
            # note that the responses will be in the order in which the requests were made
        print("finished")
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(111))
    

    输出

    starting
    issuing batch 0:20
    issuing batch 20:40
    issuing batch 40:60
    issuing batch 60:80
    issuing batch 80:100
    issuing last batch 100:111
    finished
    

    这里的重要位是asyncio.create_task(创建一个任务并启动它,返回一个Task对象)、await asyncio.sleep(1)(用于限制请求)和await asyncio.gather(等待所有任务完成运行)。
    对于 Python asyncio.ensure_future 而不是 asyncio.create_task

    【讨论】:

      猜你喜欢
      • 2018-06-01
      • 1970-01-01
      • 2020-04-25
      • 2020-09-30
      • 2015-11-29
      • 1970-01-01
      • 1970-01-01
      • 2020-12-15
      • 1970-01-01
      相关资源
      最近更新 更多