【问题标题】:Limiting number of API requests started per second using Parallel.ForEach使用 Parallel.ForEach 限制每秒启动的 API 请求数
【发布时间】:2016-10-14 14:04:00
【问题描述】:

我正在努力改进我的一些代码以提高效率。在原始代码中,我将允许的线程数限制为 5,如果我已经有 5 个活动线程,我会等到一个完成后再开始另一个。现在我想修改此代码以允许任意数量的线程,但我希望能够确保每秒只有 5 个线程启动。例如:

  • 第二个 0 - 5 个新线程
  • 第二个1-5个新线程
  • 第二个 2 - 5 个新线程 ...

原始代码(cleanseDictionary 通常包含数千个项目):

        ConcurrentDictionary<long, APIResponse> cleanseDictionary = new ConcurrentDictionary<long, APIResponse>();
        ConcurrentBag<int> itemsinsec = new ConcurrentBag<int>();
        ConcurrentDictionary<long, string> resourceDictionary = new ConcurrentDictionary<long, string>();
        DateTime start = DateTime.Now;

        Parallel.ForEach(resourceDictionary, new ParallelOptions { MaxDegreeOfParallelism = 5 }, row =>
        {
            lock (itemsinsec)
            {
                ThrottleAPIRequests(itemsinsec, start);

                itemsinsec.Add(1);
            }

            cleanseDictionary.TryAdd(row.Key, _helper.MakeAPIRequest(string.Format("/endpoint?{0}", row.Value)));
        });


    private static void ThrottleAPIRequests(ConcurrentBag<int> itemsinsec, DateTime start)
    {
        if ((start - DateTime.Now).Milliseconds < 10001 && itemsinsec.Count > 4)
        {
            System.Threading.Thread.Sleep(1000 - (start - DateTime.Now).Milliseconds);
            start = DateTime.Now;
            itemsinsec = new ConcurrentBag<int>();
        }
    }

我的第一个想法是将MaxDegreeofParallelism 增加到更高的值,然后有一个辅助方法,该方法将在一秒钟内仅限制 5 个线程,但我不确定这是否是最好的方法,是否是,我可能需要一个lock 在这一步附近?

提前致谢!

编辑 我实际上正在寻找一种限制 API 请求而不是实际线程的方法。我以为他们是同一个。

编辑 2:我的要求是每秒发送超过 5 个 API 请求

【问题讨论】:

  • Parallel.ForEach 不会启动新线程。它使用许多任务对大量数据进行分区,并让每个任务专门处理该数据。你想用这段代码做什么?当你最多有 5 个并发调用时,为什么你试图“节流”?
  • @PanagiotisKanavos 这是主机设置的要求,在任何给定的秒内只允许 5 个(所以我想)但现在我发现他们的计数器在每一秒过去后都会重置。跨度>
  • 您似乎正在尝试限制 请求,而不是线程。您的实际要求是什么?执行例如最多 5 个并发请求,或 5 个请求/秒?
  • @PanagiotisKanavos 是的,我想这更清楚——我想限制请求,以便每秒只有 5 个请求。我确实希望尽可能多地发出请求
  • 唯一允许对这个问题投反对票的是那些过去必须实施请求限制的人!如果您了解这些技术,只有他们知道这是多么令人困惑!

标签: c# multithreading parallel.foreach


【解决方案1】:

MS 网站上的“Parallel.ForEach”

可以并行运行

如果您想对线程的管理方式进行任何程度的精细控制,这不是办法。
如何创建自己的帮助类,您可以在其中使用组 id 对作业进行排队,允许您等待组 id X 的所有作业完成,并在需要时产生额外的线程?

【讨论】:

  • @PanagiotisKanavos 谁知道 - 在重新阅读问题时,突然间不太清楚问题到底是什么。
【解决方案2】:

对我来说最好的解决方案是:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace SomeNamespace
{
    public class RequestLimiter : IRequestLimiter
    {
        private readonly ConcurrentQueue<DateTime> _requestTimes;
        private readonly TimeSpan _timeSpan;

        private readonly object _locker = new object();

        public RequestLimiter()
        {
            _timeSpan = TimeSpan.FromSeconds(1);
            _requestTimes = new ConcurrentQueue<DateTime>();
        }

        public TResult Run<TResult>(int requestsOnSecond, Func<TResult> function)
        {
            WaitUntilRequestCanBeMade(requestsOnSecond).Wait();
            return function();
        }

        private Task WaitUntilRequestCanBeMade(int requestsOnSecond)
        {
            return Task.Factory.StartNew(() =>
            {
                while (!TryEnqueueRequest(requestsOnSecond).Result) ;
            });
        }

        private Task SynchronizeQueue()
        {
            return Task.Factory.StartNew(() =>
            {
                _requestTimes.TryPeek(out var first);

                while (_requestTimes.Count > 0 && (first.Add(_timeSpan) < DateTime.UtcNow))
                    _requestTimes.TryDequeue(out _);
            });
        }

        private Task<bool> TryEnqueueRequest(int requestsOnSecond)
        {
            lock (_locker)
            {
                SynchronizeQueue().Wait();
                if (_requestTimes.Count < requestsOnSecond)
                {
                    _requestTimes.Enqueue(DateTime.UtcNow);
                    return Task.FromResult(true);
                }
                return Task.FromResult(false);
            }
        }
    }
}

【讨论】:

  • 请添加一些解释,而不是简单地粘贴代码。
  • 为什么每次运行都需要requestsOnSecond参数?
【解决方案3】:

我希望能够每秒发送超过 5 个 API 请求

这真的很简单:

while (true) {
 await Task.Delay(TimeSpan.FromSeconds(1));
 await Task.WhenAll(Enumerable.Range(0, 5).Select(_ => RunRequestAsync()));
}

也许不是最好的方法,因为会有大量请求。这不是连续的。

此外,还有时间偏差。一次迭代需要超过 1 秒。这可以通过几行时间逻辑来解决。

【讨论】:

  • 所以我可以在有要发送的请求时循环。你是什​​么意思这不是连续的?
  • 每秒有一次请求突发。如果您想每小时发送 100 万个请求,这将是一个问题。每 1 秒 5 次可能不是。
  • 我得到的通常计数在 500 到 5000 之间。这会是个问题吗?我正在寻找一种有效的解决方案来每秒发出 5 个请求(或尽可能接近它)
  • 你在说什么“计数”?
  • 我需要发送的 API 请求数量
猜你喜欢
  • 1970-01-01
  • 2018-03-23
  • 1970-01-01
  • 1970-01-01
  • 2013-12-13
  • 2012-02-02
  • 2019-07-20
  • 1970-01-01
  • 2021-11-18
相关资源
最近更新 更多