【问题标题】:Parallel.ForEach exceed threads limitParallel.ForEach 超出线程限制
【发布时间】:2018-07-25 02:31:52
【问题描述】:

我正在尝试做一个稳定的多线程系统(使用确切数量的线程集)

这是我实际使用的代码:

public void Start()
{

    List<String> list = new List<String>(File.ReadAllLines("urls.txt"));

    int maxThreads = 100;
    var framework = new Sender();

    ThreadPool.SetMinThreads(maxThreads, maxThreads);

    Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = maxThreads }, delegate (string url)
    {

        framework.Send(url, "proxy:port");

    });

    Console.WriteLine("Done.");

}

它速度快且工作正常,但超过了 100 个线程限制,如果我使用的代理锁定到 100 个同时连接,则不会有问题,所以我的代理提供商取消了很多请求,任何想法如何在不超过限制的情况下保持线程速度?

谢谢。

【问题讨论】:

  • 您确定问题不在于您的 Sender.Send 方法 - 它可能只是发送请求并立即返回,而不等待结果?
  • 看起来你可能有一个类型-o。你不是要打电话给ThreadPool.SetMaxThreads() 看起来你正在设置最小值。
  • 不,我确定它确实在等待结果,因为如果结果包含 X,它会写入控制台,@w4g3n3r:不,没有 SetMinThreads,Parallel.ForEach 慢得多,与 SetMaxThreads 相同.
  • @PeterWave 仅仅因为它写入控制台并不意味着它不会立即返回。做一个int,在framework.Send之前加一个,在framework.Send之后减去它。制作另一个 int,并存储第一个 int 的最大值。让我知道你得到了什么。 (使用 Interlocked.Increment)
  • @MineR 每个请求都会返回一个唯一的结果,它确实会等待结果,我很确定。

标签: c# multithreading task task-parallel-library parallel.foreach


【解决方案1】:

您的 Framwork.Send 方法立即返回并异步处理。为了验证这一点,我创建了以下测试方法,它按预期工作:

public static void Main()
{
    List<String> list = new List<String>(Enumerable.Range(0,10000).Select(i=>i.ToString()));

    int maxThreads = 100;

    ThreadPool.SetMinThreads(maxThreads, maxThreads);

    int currentCount = 0;
    int maxCount = 0;
    object locker = new object();
    Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = maxThreads }, delegate (string url)
    {
        lock (locker)
        {
            currentCount++;
            maxCount = Math.Max(currentCount, maxCount);
        }
        Thread.Sleep(10);
        lock (locker)
        {
            maxCount = Math.Max(currentCount, maxCount);
            currentCount--;
        }
    });

    Console.WriteLine("Max Threads: " + maxCount); //Max Threads: 100
    Console.Read();
}

【讨论】:

  • 您的代码确实显示我正在运行 100 个线程,但我的代理提供程序面板显示我超出了允许的 tcp 连接限制。i.imgur.com/8j8VWoJ.png
  • 嗯,问题不在于 Parallel.ForEach,它在最大线程数的情况下按预期工作。你的问题一定出在其他地方。这个 Sender 类是 .NET 中的东西吗?
  • @PeterWave 您的服务器也可能根据每个时间跨度的多个查询而不是同时查询的数量开始拒绝请求。
  • 谢谢,问题出在你说的不是我的 Parallel.ForEach 而是我的 RequestBuilder 类(由 Sender 类使用),我用另一个库(xNet)重新编码它,它工作正常。 i.imgur.com/CLeRdt6.png
  • @MineR 在Parallel.For 中使用锁和阻塞线程失败它的目的。还不如使用它。它也不需要。 MaxDegreeOfParallelism已经限制线程
【解决方案2】:

Parallel.For/Foreach 用于数据并行 - 处理不需要执行 IO 的大量数据。在这种情况下,没有理由使用比可以运行它们的内核更多的线程。

这个问题是关于网络 IO、并发连接和节流。如果代理提供程序有限制,MaxDegreeOfParallelism 必须设置为足够低的值,以免超出限制。

更好的解决方案是使用 ActionBlock,它具有有限的 MaxDegreeOfParallelism 及其输入缓冲区的限制,因此它不会被等待处理的 url 淹没。

static async Task Main()
{
    var maxConnections=20;
    var options=new ExecutionDataflowBlockOptions 
                {
                    MaxDegreeOfParallelism = maxConnections,
                    BoundedCapacity        = maxConnections * 2
                };
    var framework = new Sender();
    var myBlock=new ActionBlock<string>(url=>
                {
                    framework.Send(...);
                }, options);

    //ReadLines doesn't load everything, it returns an IEnumerable<string> that loads
    //lines as needed
    var lines = File.ReadLines("urls.txt");

    foreach(var url in lines)
    {
        //Send each line to the block, waiting if the buffer is full
        await myBlock.SendAsync(url);
    }
    //Tell the block we are done
    myBlock.Complete();
    //And wait until it finishes everything
    await myBlock.Completion;
}

设置有界容量和 MaxDegreeOfParallelism 有助于解决并发限制,但对请求/秒限制没有帮助。为了限制这一点,可以在每个请求之后添加一个小的延迟。该块的代码必须更改为例如:

    var delay=250; // Milliseconds, 4 reqs/sec per connection
    var myBlock=new ActionBlock<string>( async url=>
                {
                    framework.Send(...);
                    await Task.Delay(delay);
                }, options);

如果Sender.Send 成为异步方法,这可以进一步改进。例如,它可以使用只提供异步方法的 HttpClient,因此它不会阻塞等待响应。变化很小:

    var myBlock=new ActionBlock<string>( async url=>
                {
                    await framework.SendAsync(...);
                    await Task.Delay(delay);
                }, options);

但程序会使用 less 个线程和更少的 CPU - 每次调用 await ... 都会释放当前线程,直到收到响应。

另一方面,阻塞线程与自旋等待有关,这意味着它会浪费 CPU 周期等待响应,然后再使线程进入睡眠状态。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多