【问题标题】:How to add time delay in asynchronous coroutines?如何在异步协程中添加时间延迟?
【发布时间】:2021-08-19 19:12:03
【问题描述】:

我正在尝试同时从 Binance 检索数据库中每个加密货币对的历史数据。我遇到了 APIErrors 的禁令,说明“APIError(code=-1003): Way too much request weight used; IP 被禁止直到 1629399758399。请使用 websocket 进行实时更新以避免被禁止。”

如何添加时间延迟以防止达到每 1 分钟 1200 的 API 请求权重限制?

这就是我现在所拥有的

import numpy as np
import json
import requests
import datetime, time
import aiohttp, asyncpg, asyncio
from asyncio import gather, create_task
from binance.client import AsyncClient
from multiprocessing import Process
import time
import config


async def main():
    # create database connection pool
    pool = await asyncpg.create_pool(user=config.DB_USER, password=config.DB_PASS, database=config.DB_NAME, host=config.DB_HOST, command_timeout=60)
    
    # get a connection
    async with pool.acquire() as connection:
        cryptos = await connection.fetch("SELECT * FROM crypto")

        symbols = {}
        for crypto in cryptos:
            symbols[crypto['id']] = crypto['symbol']

    await get_prices(pool, symbols)

async def get_prices(pool, symbols):
    try:
        # schedule requests to run concurrently for all symbols
        tasks = [create_task(get_price(pool, crypto_id, symbols[crypto_id])) for crypto_id in symbols]
        await gather(*tasks)           
        print("Finalized all. Retrieved price data of {} outputs.".format(len(tasks)))
        
    except Exception as e:
        print("Unable to fetch crypto prices due to {}.".format(e.__class__))
        print(e) 

async def get_price(pool, crypto_id, url): 
    try:
        candlesticks = []
        client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)
        
        async for kline in await client.get_historical_klines_generator(f"{crypto_id}".format(), AsyncClient.KLINE_INTERVAL_1HOUR, "18 Aug, 2021", "19 Aug, 2021"):
            candlesticks.append(kline)
        df = pd.DataFrame(candlesticks, columns = ["date","open","high","low","close","volume","Close time","Quote Asset Volume","Number of Trades","Taker buy base asset volume","Taker buy quote asset volume","Ignore"])
        df["date"] = pd.to_datetime(df.loc[:, "date"], unit ='ms')
        df.drop(columns=['Close time','Ignore', 'Quote Asset Volume', 'Number of Trades', 'Taker buy base asset volume', 'Taker buy quote asset volume'], inplace=True)
        df.loc[:, "id"] = crypto_id
        df
        print(df)
    except Exception as e:
        print("Unable to get {} prices due to {}.".format(url, e.__class__))
        print(e)  

start = time.time()
if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

end = time.time()

print("Took {} seconds.".format(end - start))

【问题讨论】:

  • 如果有很多symbols,您可能会遇到同时调用的所有这些调用。也许小批量运行它们?
  • 是的,我同意我的数据库中目前有大约 1500 个符号。我仍然是编程界的菜鸟,你能解释一下如何小批量运行吗?
  • 好吧,想想如何将symbols 分割成更小的批次,然后像您目前对完整的symbols 列表那样循环遍历它们?

标签: python async-await python-asyncio binance


【解决方案1】:

您可以创建一个自定义类的实例,该实例将保留当前活动请求的计数(以及请求的时间) - 并且仅在该守卫说没问题时才允许继续执行一个请求。

Python 的async with 命令可以很好地用于这样的构造,因为它既可以保护块,又可以减少活动请求数,而对您已有的代码的干预最少。

这可以像这样进行 - 代码中实际触发请求的行是:

client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)

所以,如果我们能确保这条线路每分钟最多被调用 1200 次,并且在它没有发生的情况下不得不屈服于主循环,我们就很好了。 虽然有可能突发 1200 (-1) 次调用并等待一分钟,但如果我们简单地每次调用一次(60 秒 / 1200) ( x 90% 为 10% 不错的余量) 秒。

async with 将调用类的__aenter__ 方法。在那里,我们可以简单地检查自上次 API 调用和休眠到此时间过去的时间间隔。 (实际上,每个任务我们都需要一个类的实例,因为每个实例都需要调用__aenter__)。但为了不依赖于全局“计数器”,我们可以创建一个工厂函数,该函数将为每个需要限制的 API 创建一个保护 - 我们将它保存在一个全局变量中)

因此,您可以将此工厂函数添加到您的程序中,然后在您的主函数上创建一个保护类,并在任务代码中使用“async with”:

def create_rate_limit_guard(rate_limit=1200, safety_margin=0.9):
    """Rate limit is given in maximum requests per minute.
    """
    
    # TBD: it would easy to have code to throttle by maximum active requests
    # instead of total requests per minute. 
    # I will just let the accounting of concurrent_requests in place, though
    
    class Guard:
        request_interval = (60 / rate_limit) * safety_margin
        current_requests = 0
        max_concurrent_requests = 0
        last_request = 0
        
        async def __aenter__(self):
            cls = self.__class__
            cls.current_requests += 1
            if (throttle_wait:= time.time() - last_request) < cls.request_interval:
                await asyncio.sleep(throttle_wait)
            cls.current_requests += 1
            cls.last_request = time.time()
        
        async def __aexit__(self, exc_type, exc, tb):
            cls = self.__class__
            cls.max_concurrent_requests = max(cls.max_concurrent_requests, cls.current_requests)
            cls.current_requests -= 1
        
    return Guard

在您的代码中,您可以将 get_price 更改为此,并创建保护类(if ...__main__ 之前的最后一行:



async def get_price(pool, crypto_id, url): 
    try:
        candlesticks = []
        # consider having a single client application wise - you are creating one per task. 
        with BinanceLimitGuard():
            client = await AsyncClient.create(config.BINANCE_API_KEY, config.BINANCE_SECRET_KEY)
        
        
        # as the actual calls to the remote endpoint are done inside the client code itself,
        # we can't just run "async for" on the generator - instead we have to throttle
        # all the "for" interactions. So we "unfold" the async for in a while/anext 
        # structure so that we can place the guard before each interation:
        klines_generator = await client.get_historical_klines_generator(
            f"{crypto_id}".format(), AsyncClient.KLINE_INTERVAL_1HOUR, "18 Aug, 2021", "19 Aug, 2021")
        while True:
            try:
                with BinanceLimitGuard():
                    kline = await klines_generator.__anext__()
            except StopAsyncIteration:
                break
        
            candlesticks.append(kline)
        
        df = pd.DataFrame(candlesticks, columns = ["date","open","high","low","close","volume","Close time","Quote Asset Volume","Number of Trades","Taker buy base asset volume","Taker buy quote asset volume","Ignore"])
        df["date"] = pd.to_datetime(df.loc[:, "date"], unit ='ms')
        df.drop(columns=['Close time','Ignore', 'Quote Asset Volume', 'Number of Trades', 'Taker buy base asset volume', 'Taker buy quote asset volume'], inplace=True)
        df.loc[:, "id"] = crypto_id
        print(df)
    except Exception as e:
        print("Unable to get {} prices due to {}.".format(url, e.__class__))
        print(e)  

BinanceLimitGuard = create_rate_limit_guard(300)

if __name__ == "__main__":
    # all code that is meant to take place when your file is run as a program
    # should be guarded in this if block. Importing your file should not "print"
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

     end = time.time()

     print("Took {} seconds.".format(end - start))

请注意,虽然我将防护设计为“每分钟 1200 个请求” - 我建议在上面 BinanceLimitGuard = create_rate_limit_guard(300) 中限制为每分钟“300 个”并行任务 - 因为检查币安客户端本身的源代码,它确实在对“get_historical_klines”的调用中执行了自身的多个请求——并且该代码嵌入了每秒 3 次调用的限制——但每个生成器都会发生这种情况,因此我们无法在外部代码中考虑它们。

如果这仍然不起作用,可以通过对 AsyncClient 本身进行子类化(或猴子补丁)并将限制速率设置在其内部 _request_api 内部方法上,在这个地方 https://github.com/sammchardy/python-binance/blob/a6f3048527f0f2fd9bc6591ac1fdd926b2a29f3e/binance/client.py#L330 - 然后你可以回去到“1200 限制”,因为它将计入所有内部呼叫。 (如果您需要求助,请发表评论,我可以完成此答案或添加另一个)

【讨论】:

  • 我绝对需要时间来消化它!非常感谢您的帮助和专业知识。我很想能够求助于这个!
  • 最初的配方最终对通用项目非常有用。就在我写完之后,我发现它不能完全适用于 binance api(所以最后一部分的注意事项和代码展开)
猜你喜欢
  • 2019-01-15
  • 2020-04-25
  • 1970-01-01
  • 1970-01-01
  • 2017-04-07
  • 1970-01-01
  • 1970-01-01
  • 2019-09-15
  • 2022-11-01
相关资源
最近更新 更多