【问题标题】:Semaphore for limiting requests per second doesn't work用于限制每秒请求的信号量不起作用
【发布时间】:2021-06-07 19:29:53
【问题描述】:

我正在使用 Google Analytics,该服务的并发请求数限制为 10 个。我不得不以某种方式限制我的 API,所以我决定使用信号量,但它似乎不起作用。所有请求同时触发。我在我的代码中找不到问题。

public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
{
    var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
    var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
    var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
    var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
    var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
    var topPages = _googleAnalyticsService.GetTodaysTopPages();
    var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
    var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
    var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
    var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
    var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
    var usersByCountry = _googleAnalyticsService.GetUsersByCountry();

    var tasks = new List<Task>()
        {
            todayVisits, todayTraffic, newAndReturningUsers,
            averageSessionDuration, deviceCategory, topPages,
            guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
            visitsByHours, usersByPrefectures, usersByCountry
        };

    var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);

    foreach(var task in tasks)
    {
        await throttler.WaitAsync();

        try
        {
            await task;
            await Task.Delay(1000); // It's important due to limits of Google Analytics requests (10 queries per second per IP address)
        }
        finally
        {
            throttler.Release();
        }
    }

    await Task.WhenAll(tasks);

    return new SiteAnalyticsDTO()
        {
            TodayVisits = await todayVisits,
            TodayTraffic = await todayTraffic,
            NewAndReturningUsers = await newAndReturningUsers,
            AverageSessionDuration = await averageSessionDuration,
            DeviceCategory = await deviceCategory,
            TopPages = await topPages,
            GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
            AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
            VisitsPerWeekday = await visitsPerWeekday,
            VisitsByHours = await visitsByHours,
            UsersByPrefectures = await usersByPrefectures,
            UsersByCountry = await usersByCountry
        };
}

以下是 Google Analytics 调用的一些示例方法:

  public async Task<int> GetTodayVisitsNumber(List<long> listingIds = null)
    {
        string filter = GetFilter(listingIds);

        var getReportsRequest = GetReportsRequestModel(GetTodayDateRange(), "ga:sessionCount", "ga:sessions", _configuration.MainViewId, filter);
        var response = await _service.Reports.BatchGet(getReportsRequest).ExecuteAsync();
        Console.WriteLine(response);
        var data = response.Reports.FirstOrDefault();

        return Convert.ToInt32(data?.Data.Totals[0].Values[0]);
    }

【问题讨论】:

    标签: c# .net-core async-await google-analytics semaphore


    【解决方案1】:

    所有请求同时触发。

    让我们看看这里

    var todayVisits = _googleAnalyticsService.GetTodayVisitsNumber();
    var todayTraffic = _googleAnalyticsService.GetTodayTraffic();
    var newAndReturningUsers = _googleAnalyticsService.GetNewAndReturningUsersNumber();
    var averageSessionDuration = _googleAnalyticsService.GetAverageSessionDuration();
    var deviceCategory = _googleAnalyticsService.GetSessionNumberByDeviceCategory();
    var topPages = _googleAnalyticsService.GetTodaysTopPages();
    var guestsAndRegisteredUsers = _googleAnalyticsService.GetGuestsVsRegisteredUsers();
    var averageNumberOfSessionsPerDay = _googleAnalyticsService.GetAverageSessionsNumber();
    var visitsPerWeekday = _googleAnalyticsService.GetTrafficByWeekday();
    var visitsByHours = _googleAnalyticsService.GetTrafficByTimeOfDay();
    var usersByPrefectures = _googleAnalyticsService.GetUsersByPrefectures();
    var usersByCountry = _googleAnalyticsService.GetUsersByCountry();
    

    您正在存储每个方法的结果。当您使用诸如“methodName();”之类的括号标记时,您会调用该方法并将结果存储在var 中。

    然后您将这些方法的结果存储在一个列表中,然后将await 与每个Semaphore 一起存储,以限制一次可以等待的任务数。

    问题是:每个await 都会立即完成,因为您在上面最初调用它们时已经等待(同步)它们。

    这使您相信SemaphoreSlim 不起作用,因为如果每个Task 在等待时立即返回(因为它们已经被调用),那么它们之间就没有时间了。

    存储async 方法以供以后使用,而不是一次性调用它们。
    您不能像 var 那样存储委托,您必须将它们存储在显式类型变量 Func&lt;TResult&gt; 中。

    例如:

    Func<Task<object>> todayVisits = _googleAnalyticsService.GetTodayVisitsNumber;
    

    编者注,我不知道这些方法返回什么我替换的对象尽可能通用

    现在 - 如果我们将每一个都存储在一个变量中会很麻烦,所以我们不要将它们存储在单独的变量中,而是直接将它们放在这样的列表中:

    var awaitableTasks = new List<Func<Task<object>>>()
    {
        _googleAnalyticsService.GetTodayVisitsNumber,
        _googleAnalyticsService.GetTodayTraffic,
        _googleAnalyticsService.GetNewAndReturningUsersNumber,
        _googleAnalyticsService.GetAverageSessionDuration,
        _googleAnalyticsService.GetSessionNumberByDeviceCategory,
        _googleAnalyticsService.GetTodaysTopPages,
        _googleAnalyticsService.GetGuestsVsRegisteredUsers,
        _googleAnalyticsService.GetAverageSessionsNumber,
        _googleAnalyticsService.GetTrafficByWeekday,
        _googleAnalyticsService.GetTrafficByTimeOfDay,
        _googleAnalyticsService.GetUsersByPrefectures,
        _googleAnalyticsService.GetUsersByCountry
    };
    

    因为这些新对象本身不是任务,而是返回 Task 的方法,所以我们必须更改存储和调用它们的方式,为此我们将使用本地方法,因此我将介绍每个我所做的更改。

    让我们创建 Semaphore 并创建一个我们可以放置任务以跟踪它们的地方。

    当我们await它们时,我们还可以创建一个可以存储每个任务结果的地方。

    var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
    
    var tasks = new List<Task>();
    
    ConcurrentDictionary<string, object> results = new();
    

    让我们创建一个具有几个职责的本地方法

    1. 接受Func&lt;Task&lt;object&gt;&gt; 作为参数
    2. Await方法
    3. 将该方法的结果放在我们以后可以得到的地方
    4. 即使遇到错误也要释放Semphore
    async Task Worker(Func<Task<object>> awaitableFunc)
    {
        try
        {
            resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
        }
        finally
        {
            throttler.Release();
        }
    }
    

    编者注:您可以使用 lambda 表达式完成相同的操作,但为了清晰和格式化,我更喜欢使用本地方法。

    启动工人并存储他们返回的任务。

    那样......最终对象)。

    foreach (var task in awaitableTasks)
    {
        await throttler.WaitAsync();
        tasks.Add(Task.Run(() => Worker(task)));
    }
    
    // wait for the tasks to finish
    await Task.WhenAll(tasks);
    

    创建最终对象,然后返回它。

    return new SiteAnalyticsDTO()
    {
        TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
        TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
        NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
        AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
        DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
        TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
        GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
        AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
        VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
        VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
        UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
        UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
    };
    

    将所有内容整合在一起,我认为我们有一些可行的方法,或者至少可以轻松修改以满足您的需求。

    public static async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
    {
        // store these methods so we can iterate and execute them later
        var awaitableTasks = new List<Func<Task<object>>>()
        {
            _googleAnalyticsService.GetTodayVisitsNumber,
            _googleAnalyticsService.GetTodayTraffic,
            _googleAnalyticsService.GetNewAndReturningUsersNumber,
            _googleAnalyticsService.GetAverageSessionDuration,
            _googleAnalyticsService.GetSessionNumberByDeviceCategory,
            _googleAnalyticsService.GetTodaysTopPages,
            _googleAnalyticsService.GetGuestsVsRegisteredUsers,
            _googleAnalyticsService.GetAverageSessionsNumber,
            _googleAnalyticsService.GetTrafficByWeekday,
            _googleAnalyticsService.GetTrafficByTimeOfDay,
            _googleAnalyticsService.GetUsersByPrefectures,
            _googleAnalyticsService.GetUsersByCountry
        };
    
        // create a way to limit the number of concurrent requests
        var throttler = new SemaphoreSlim(MaxRequests, MaxRequests);
    
        // create a place to store the tasks we create
        var finalTasks = new List<Task>();
    
        // make sure we have some where to put our results
        ConcurrentDictionary<string, object> resultDict = new();
    
        // make a worker that accepts one of those methods, invokes it
        // then adds the result to the dict
        async Task Worker(Func<Task<object>> awaitableFunc)
        {
            try
            {
                resultDict.TryAdd(awaitableFunc.GetMethodInfo().Name, await awaitableFunc());
            }
            finally
            {
                // make sure even if we encounter an error we still release the semphore
                throttler.Release();
            }
        }
    
        // iterate over the tasks, wait for the sempahore
        // when we get a slot, create a worker and send it to the background
        foreach (var task in awaitableTasks)
        {
            await throttler.WaitAsync();
            finalTasks.Add(Task.Run(() => Worker(task)));
        }
    
        // wait for any remaining tasks to finish up in the background if they are still running
        await Task.WhenAll(finalTasks);
    
        // create the return object from the results of the dictionary
        return new SiteAnalyticsDTO()
        {
            TodayVisits = resultDict[nameof(_googleAnalyticsService.GetTodayVisitsNumber)],
            TodayTraffic = resultDict[nameof(_googleAnalyticsService.GetTodayTraffic)],
            NewAndReturningUsers = resultDict[nameof(_googleAnalyticsService.GetNewAndReturningUsersNumber)],
            AverageSessionDuration = resultDict[nameof(_googleAnalyticsService.GetAverageSessionDuration)],
            DeviceCategory = resultDict[nameof(_googleAnalyticsService.GetSessionNumberByDeviceCategory)],
            TopPages = resultDict[nameof(_googleAnalyticsService.GetTodaysTopPages)],
            GuestsAndRegisteredUsers = resultDict[nameof(_googleAnalyticsService.GetGuestsVsRegisteredUsers)],
            AverageNumberOfSessionsPerDay = resultDict[nameof(_googleAnalyticsService.GetAverageSessionsNumber)],
            VisitsPerWeekday = resultDict[nameof(_googleAnalyticsService.GetTrafficByWeekday)],
            VisitsByHours = resultDict[nameof(_googleAnalyticsService.GetTrafficByTimeOfDay)],
            UsersByPrefectures = resultDict[nameof(_googleAnalyticsService.GetUsersByPrefectures)],
            UsersByCountry = resultDict[nameof(_googleAnalyticsService.GetUsersByCountry)]
        };
    }
    

    【讨论】:

    • 有趣,不会说谎,这是我的怀疑 - 我等待它太早了。但是为什么在我的代码中 ConsoleLog 是在 forEach 中触发而不是在初始化之后呢?确实,在第一个循环中 - 所有这些,但仍然存在,为什么?
    【解决方案2】:

    您的设置问题是所有任务同时启动,并且只有它们的等待受到限制。限制等待没有有用的效果。只有 您的 延续会被延迟。目标服务批量接收所有请求。

    我的建议是使用一个专门的类来封装节流逻辑。看来您需要同时限制并发和发送请求的速率,这些限制中的每一个都可以通过使用单独的SemaphoreSlim 来实现。这是一个简单的实现:

    public class ThrottledExecution
    {
        private readonly SemaphoreSlim _concurrencySemaphore;
        private readonly SemaphoreSlim _delaySemaphore;
        private readonly TimeSpan _delay;
    
        public ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime,
            int rateLimitCount)
        {
            // Arguments validation omitted
            _concurrencySemaphore = new SemaphoreSlim(concurrencyLimit, concurrencyLimit);
            _delaySemaphore = new SemaphoreSlim(rateLimitCount, rateLimitCount);
            _delay = rateLimitTime;
        }
    
        public async Task<TResult> Run<TResult>(Func<Task<TResult>> action)
        {
            await _delaySemaphore.WaitAsync();
            ScheduleDelaySemaphoreRelease();
            await _concurrencySemaphore.WaitAsync();
            try { return await action().ConfigureAwait(false); }
            finally { _concurrencySemaphore.Release(); }
        }
    
        private async void ScheduleDelaySemaphoreRelease()
        {
            await Task.Delay(_delay).ConfigureAwait(false);
            _delaySemaphore.Release();
        }
    }
    

    你可以这样使用它:

    public async Task<SiteAnalyticsDTO> Handle(GetSiteAnalyticsParameter query)
    {
        var throttler = new ThrottledExecution(MaxRequests, TimeSpan.FromSeconds(1), 1);
    
        var todayVisits = throttler.Run(() => _service.GetTodayVisitsNumber());
        var todayTraffic = throttler.Run(() => _service.GetTodayTraffic());
        var newAndReturningUsers = throttler.Run(() => _service.GetNewAndReturningUsersNumber());
        var averageSessionDuration = throttler.Run(() => _service.GetAverageSessionDuration());
        var deviceCategory = throttler.Run(() => _service.GetSessionNumberByDeviceCategory());
        var topPages = throttler.Run(() => _service.GetTodaysTopPages());
        var guestsAndRegisteredUsers = throttler.Run(() => _service.GetGuestsVsRegisteredUsers());
        var averageNumberOfSessionsPerDay = throttler.Run(() => _service.GetAverageSessionsNumber());
        var visitsPerWeekday = throttler.Run(() => _service.GetTrafficByWeekday());
        var visitsByHours = throttler.Run(() => _service.GetTrafficByTimeOfDay());
        var usersByPrefectures = throttler.Run(() => _service.GetUsersByPrefectures());
        var usersByCountry = throttler.Run(() => _service.GetUsersByCountry());
    
        var tasks = new List<Task>()
        {
            todayVisits, todayTraffic, newAndReturningUsers,
            averageSessionDuration, deviceCategory, topPages,
            guestsAndRegisteredUsers, averageNumberOfSessionsPerDay, visitsPerWeekday,
            visitsByHours, usersByPrefectures, usersByCountry
        };
        await Task.WhenAll(tasks);
    
        return new SiteAnalyticsDTO()
        {
            TodayVisits = await todayVisits,
            TodayTraffic = await todayTraffic,
            NewAndReturningUsers = await newAndReturningUsers,
            AverageSessionDuration = await averageSessionDuration,
            DeviceCategory = await deviceCategory,
            TopPages = await topPages,
            GuestsAndRegisteredUsers = await guestsAndRegisteredUsers,
            AverageNumberOfSessionsPerDay = await averageNumberOfSessionsPerDay,
            VisitsPerWeekday = await visitsPerWeekday,
            VisitsByHours = await visitsByHours,
            UsersByPrefectures = await usersByPrefectures,
            UsersByCountry = await usersByCountry,
        };
    }
    

    似乎部分成功的结果对您没有用,因此您可以考虑在ThrottledExecution 类中添加一些自动取消逻辑。如果任务失败,所有挂起的和后续的异步操作都应该被取消。

    【讨论】:

    • 关于ThrottledExecution(int concurrencyLimit, TimeSpan rateLimitTime, int rateLimitCount) 不应该rateLimitCountconcurrencyLimit 具有相同的值,所以在我的情况下是10 而不是1 (MaxRequests)?
    • @DiPix 不,并发限制和速率限制是相互独立的。并发限制表示可以同时进行中的操作数,因此操作的持续时间很重要。速率限制表示在任何(滑动)时间窗口中可以开始多少操作,因此操作的持续时间无关紧要。如果您只想限制并发而不限制速率,您可以传递这些值:TimeSpan.ZeroInt32.MaxValue
    • 我明白了。因为目前在您的示例中,该操作甚至需要 12 秒。相反,只需 2 秒。
    • @DiPix 是的,在示例中,我将ThrottledExecution 配置为每秒一次操作的速率,这可能是不正确的。根据您问题中的代码注释,它应该是每秒十次操作,所以 rateLimitCount 应该是 10 而不是 1
    • @DiPix 我可以将Run 方法添加到RateLimiter,但我更愿意将其保留为轻量级组件,类似于SemaphoreSlimHereRun 方法,如果你想使用它。
    猜你喜欢
    • 1970-01-01
    • 2012-02-02
    • 1970-01-01
    • 2018-03-23
    • 2013-12-13
    • 1970-01-01
    • 2022-01-24
    • 1970-01-01
    • 2021-11-18
    相关资源
    最近更新 更多