【问题标题】:Most efficient way to process a queue with threads使用线程处理队列的最有效方法
【发布时间】:2011-09-06 10:12:51
【问题描述】:

我有一个队列,其中放置了待处理的傅立叶变换请求(相对耗时的操作) - 在某些情况下,我们每秒可以收到数千个变换请求,所以它必须很快。

我正在升级旧代码以使用 .net 4,以及移植到 TPL。我想知道处理这个队列的最有效(最快的吞吐量)方法是什么样的。我想使用所有可用的内核。

目前我正在尝试使用 BlockingCollection。我创建了一个生成 4 个任务的队列处理程序类,这些任务在 BlockingCollection 上阻塞并等待传入​​的工作。然后,他们处理该挂起的转换。代码:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }

这看起来是一个好的解决方案吗?似乎所有核心​​都最大化了——尽管我目前不确定我应该在构造函数中生成多少工人。

【问题讨论】:

  • 附带说明,您还可以使用英特尔 IPP 来加速傅立叶变换工作(然后在一个内核上运行的每个线程将使用 SSE 指令来完成工作),但是您很高兴互操作的 ;-)

标签: c# multithreading c#-4.0 task-parallel-library


【解决方案1】:

这看起来很合理。我发现BlockingCollection 非常快。我用它每秒处理数万个请求。

如果您的应用程序受处理器限制,那么您可能不想创建比您拥有的内核更多的工作程序。当然,您不想创建比核心更多的工人。在四核机器上,如果您预计大部分时间都花在 FFT 上,那么四个工作人员将吃掉所有 CPU。更多的工人只是意味着你有更多的线程上下文切换来处理。 TPL 通常会为您平衡这一点,但是当您无法处理的工作人员数量超过少数时,没有理由创建 100 个工作人员。

我建议您使用 3、4、5、6、7 和 8 名工作人员运行测试。看看哪一个可以为您提供最佳吞吐量。

【讨论】:

  • 4 给了我最好的——虽然里面没有多少。我将坚持这一点,并使数字可配置,如果未设置则默认为 Env.ProcessorCount。
【解决方案2】:

我同意吉姆的观点。你的方法看起来真的很好。你不会变得更好。我不是 FFT 专家,但我假设这些操作几乎 100% 受 CPU 限制。如果确实如此,那么对工人数量的一个很好的初步猜测将是与机器中的核心数量直接一对一的相关性。您可以使用Environment.ProcessorCount 来获取此值。您可以尝试使用 2x 或 4x 的乘数,但同样,如果这些操作受 CPU 限制,那么任何高于 1x 的操作都可能会导致更多开销。使用Environment.ProcessorCount 将使您的代码更具可移植性。

另一个建议...让 TPL 知道这些是专用线程。您可以通过指定 LongRunning 选项来做到这一点。

public IncomingPacketQueue()
{
    for (int i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }
}

【讨论】:

  • 我同意,但您可能还想忽略超线程中的核心,只考虑真正的核心。
  • Env.ProcessorCount 的好技巧...对我来说应该很有效。
【解决方案3】:

为什么不使用 Parallel.ForEach 并让 TPL 处理创建的线程数。

        Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ),
                         sweep => {
                           //do stuff
                           var worker = new IfftWorker();
                           Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                           worker.DoIfft(sweep);                

                         });

(GetConsumingPartitioner 是ParallelExtensionsExtras 的一部分)

【讨论】:

  • 这看起来也是一个不错的解决方案 - 我会用它来看看我得到了什么:)
【解决方案4】:

使工作人员的数量可配置。还有太多的工人,它会变慢(如另一张海报所示),所以你需要找到最佳位置。可配置的值将允许测试运行找到最佳值,或者允许您的程序适用于不同类型的硬件。您当然可以将此值放在 App.Config 中并在启动时读取它。

【讨论】:

    【解决方案5】:

    您还可以尝试使用 PLINQ 来并行化处理,看看它与您当前使用的方法相比如何。它有一些技巧可以使它在某些情况下非常高效。

    _packetQ.GetConsumingEnumerable().AsParallel().ForAll(
        sweep => new IfftWorker().DoIfft(sweep));
    

    【讨论】:

    • 您不能将 PLINQ 与 BlockingCollection 一起使用。默认分区程序可能会丢失项目或死锁。始终使用 ParallelExtensionsExtras 中的 BlockingCollectionPartitioner
    猜你喜欢
    • 1970-01-01
    • 2021-10-27
    • 1970-01-01
    • 1970-01-01
    • 2012-11-12
    • 1970-01-01
    • 2013-09-10
    • 2015-09-27
    • 1970-01-01
    相关资源
    最近更新 更多