【问题标题】:Limit total # of parallel threads spawned by nested parallel loops限制嵌套并行循环产生的并行线程总数
【发布时间】:2014-06-27 19:46:32
【问题描述】:

所以。我有一个正在爬网的例程。该例程以 IP 列表为种子,并跟踪它在爬取设备时找到的 IP。当它找到新 IP 时,它也会抓取这些 IP。

这是我的问题。我在并行 foreach 中运行种子 IP 的初始扫描,并在并行 foreach 中启动我在每个设备上找到的 IP,因此我最终可以得到 10 个线程,每个线程产生 10 个线程,总共 100 个线程(或更多如果这些线程找到自己的设备)。我想限制整个进程使用的线程总数(比如说 25)。

这可以在 C# 的任务库中完成吗?

我知道 foreach 循环上的 MaxDegreeOfParallelism 属性,但是可以共享吗?

【问题讨论】:

  • 是的 ParallelOptions 可以共享。
  • 您的问题涉及线程,但 Parallel.ForEach 使用任务,它不会将 1-1 映射到线程。是什么让您认为需要调整并行度?
  • elios,当然,您可以共享对象,但它不限制例程使用的线程/任务总数。凯文,任务/线程在这里并不重要。在我的用例中,出于性能原因,我有时需要限制总 CPU 使用量。

标签: c# parallel.foreach


【解决方案1】:

将这些任务推送到共享任务工厂怎么样? How to: Create a Task Scheduler That Limits Concurrency

【讨论】:

    【解决方案2】:

    鲨鱼特工的回答成功了。我想我会分享我的工作示例,并讨论我遇到的一些事情。

    起初,我使用嵌套的 Parallel.ForEach 循环。但是我突然想到,如果我将线程/任务计数限制为 # 小于第一个循环中的总数,那么将没有线程来处理第二个循环,因此例程将永远不会完成。所以这根本行不通。

    这留下了 Agent Shark 的共享队列的想法,您可以将任务推送到该共享队列上,当线程被释放时运行。

    这是我的解决方案的样子。

        internal class Program
        {
            private static void Main(string[] args)
            {
                // setup the factory
                LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(9);
                TaskFactory f = new TaskFactory(lcts);
    
                // create my shared task queue
                ConcurrentDictionary<string, Task> waiting = new ConcurrentDictionary<string, Task>();
                ConcurrentBag<Task> finished = new ConcurrentBag<Task>();
    
                // some numbers....
                List<int> nums = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
    
                foreach (int n in nums)
                {
                    int i = n; // if you don't do this, n is 0 when it writes to the Debug console....
                    Task t = f.StartNew(() =>
                    {
                        Debug.WriteLine(i);
    
                        // some more numbers....
                        List<int> other = new List<int>() { 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20 };
    
                        foreach (int nm in other)
                        {
                            int j = i;
                            int w = nm;
                            Task tk = f.StartNew(() =>
                            {
                                Debug.WriteLine(j + "," + w);
                                Thread.Sleep(1000);
                            });
                            waiting.TryAdd(j + "," + w, tk);
                        }
    
                        Thread.Sleep(500);
                    });
                    waiting.TryAdd(i.ToString(), t);
                }
    
                // loop until no further tasks are waiting.
                while (waiting.Count > 0)
                {
                    // run the tasks...
                    Task.WaitAll(waiting.Values.ToArray());
    
                    // remove the finised tasks from the waiting list.
                    foreach (KeyValuePair<string, Task> pair in waiting)
                    {
                        if (pair.Value.IsCompleted)
                        {
                            finished.Add(pair.Value);
                            Task o;
                            waiting.TryRemove(pair.Key, out o);
                        }
                    }
    
                    Thread.Sleep(100);
                }
            }
        }
    
        /// <summary>
        /// Provides a task scheduler that ensures a maximum concurrency level while
        /// running on top of the ThreadPool.
        /// </summary>
        public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
        {
            /// <summary>Whether the current thread is processing work items.</summary>
            [ThreadStatic]
            private static bool _currentThreadIsProcessingItems;
    
            /// <summary>The list of tasks to be executed.</summary>
            private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
    
            /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
            private readonly int _maxDegreeOfParallelism;
    
            /// <summary>Whether the scheduler is currently processing work items.</summary>
            private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
    
            /// <summary>
            /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
            /// specified degree of parallelism.
            /// </summary>
            /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
            public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
            {
                if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
                _maxDegreeOfParallelism = maxDegreeOfParallelism;
            }
    
            /// <summary>Queues a task to the scheduler.</summary>
            /// <param name="task">The task to be queued.</param>
            protected sealed override void QueueTask(Task task)
            {
                // Add the task to the list of tasks to be processed.  If there aren't enough
                // delegates currently queued or running to process tasks, schedule another.
                lock (_tasks)
                {
                    _tasks.AddLast(task);
                    if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                    {
                        ++_delegatesQueuedOrRunning;
                        NotifyThreadPoolOfPendingWork();
                    }
                }
            }
    
            /// <summary>
            /// Informs the ThreadPool that there's work to be executed for this scheduler.
            /// </summary>
            private void NotifyThreadPoolOfPendingWork()
            {
                ThreadPool.UnsafeQueueUserWorkItem(_ =>
                {
                    // Note that the current thread is now processing work items.
                    // This is necessary to enable inlining of tasks into this thread.
                    _currentThreadIsProcessingItems = true;
                    try
                    {
                        // Process all available items in the queue.
                        while (true)
                        {
                            Task item;
                            lock (_tasks)
                            {
                                // When there are no more items to be processed,
                                // note that we're done processing, and get out.
                                if (_tasks.Count == 0)
                                {
                                    --_delegatesQueuedOrRunning;
                                    break;
                                }
    
                                // Get the next item from the queue
                                item = _tasks.First.Value;
                                _tasks.RemoveFirst();
                            }
    
                            // Execute the task we pulled out of the queue
                            base.TryExecuteTask(item);
                        }
                    }
                    // We're done processing items on the current thread
                    finally { _currentThreadIsProcessingItems = false; }
                }, null);
            }
    
            /// <summary>Attempts to execute the specified task on the current thread.</summary>
            /// <param name="task">The task to be executed.</param>
            /// <param name="taskWasPreviouslyQueued"></param>
            /// <returns>Whether the task could be executed on the current thread.</returns>
            protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
            {
                // If this thread isn't already processing a task, we don't support inlining
                if (!_currentThreadIsProcessingItems) return false;
    
                // If the task was previously queued, remove it from the queue
                if (taskWasPreviouslyQueued) TryDequeue(task);
    
                // Try to run the task.
                return base.TryExecuteTask(task);
            }
    
            /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
            /// <param name="task">The task to be removed.</param>
            /// <returns>Whether the task could be found and removed.</returns>
            protected sealed override bool TryDequeue(Task task)
            {
                lock (_tasks) return _tasks.Remove(task);
            }
    
            /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
            public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
    
            /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
            /// <returns>An enumerable of the tasks currently scheduled.</returns>
            protected sealed override IEnumerable<Task> GetScheduledTasks()
            {
                bool lockTaken = false;
                try
                {
                    Monitor.TryEnter(_tasks, ref lockTaken);
                    if (lockTaken) return _tasks.ToArray();
                    else throw new NotSupportedException();
                }
                finally
                {
                    if (lockTaken) Monitor.Exit(_tasks);
                }
            }
        }
    

    【讨论】:

    • 我不太明白为什么要设置int i = n。我认为与clojures有关。如果我不设置 i = n 并使用 i,那么所有线程的 n 最终都为 0。
    • 因为 C# 很奇怪,并且在带有捕获变量的 for 循环的情况下保留对该 int 的对象引用。
    猜你喜欢
    • 2015-02-05
    • 1970-01-01
    • 2021-09-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-15
    • 2014-01-10
    • 1970-01-01
    相关资源
    最近更新 更多