【问题标题】:Share a BlockingCollection across multiple tasks跨多个任务共享 BlockingCollection
【发布时间】:2012-06-29 05:04:49
【问题描述】:

这是我的场景。我从外部数据源获取大量数据,我必须在两个地方本地写入。其中一个目的地的写入速度非常慢,但另一个目的地非常快(但我不能依靠它来读取和写入缓慢的目的地)。为此,我使用了生产者-消费者模式(使用 BlockingCollection)。

我现在遇到的问题是我必须在两个 BlockingCollection 中对数据进行排队,这会占用太多内存。我的代码看起来与下面的示例非常相似,但我真的很想从一个队列中驱动两个任务。有谁知道这样做的正确方法是什么?下面的代码有什么低效率的地方吗?

class Program
{
    const int MaxNumberOfWorkItems = 15;
    static BlockingCollection<int> slowBC = new BlockingCollection<int>(MaxNumberOfWorkItems);
    static BlockingCollection<int> fastBC = new BlockingCollection<int>(MaxNumberOfWorkItems);

    static void Main(string[] args)
    {
        Task slowTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in slowBC.GetConsumingEnumerable())
            {
                Console.WriteLine("SLOW -> " + item);
                Thread.Sleep(25);
            }
        });

        Task fastTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in fastBC.GetConsumingEnumerable())
            {
                Console.WriteLine("FAST -> " + item);
            }
        });

        // Population two BlockingCollections with the same data. How can I have a single collection?
        for (int i = 0; i < 100; i++)
        {
            while (slowBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for slowBC...");
            }

            while (fastBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for 2...");
            }
        }

        slowBC.CompleteAdding();
        fastBC.CompleteAdding();

        Task.WaitAll(slowTask, fastTask);

        Console.ReadLine();
    }
}

【问题讨论】:

  • 不是一个真正的答案,而是 FWIW,另一种方法是使用 TPL DataFlow,您可以将 BroadcastBlock 发送到 2 个不同的 ActionBlock,用于 2 个不同的写入路径。这种方法的主要好处是不必手动管理“链接”集合。
  • 同样,当然,你可以使用 Rx,创建一个 observable,并拥有 2 个订阅者 AFAICT。
  • 这是两个非常好的主意,但我必须坚持使用 .NET 4.0(并且没有 RX)中可用的内容...... :(
  • 您在运行时阻塞行为方面的目标是什么?例如,考虑到速度差异,另一种选择是只使用那个 foreach 并让它的主体先写入快的,然后再写入慢的。这会很好,还是让速度快的人“跑在前面”很重要?
  • 一个简单的解决方案是为每个项目启动两个任务。这意味着您将在物品可用时尽快使用它们。另一种选择是同步更新缓存并将数据库更新作为任务启动。这将在缓存允许的情况下以最快的速度消耗项目。

标签: c# .net multithreading task-parallel-library


【解决方案1】:
  1. 使用生产者-消费者队列来传输单个整数是非常低效的。你是在块中接收它,所以为什么不将队列输入为 '*chunk' 并发送整个块,立即在同一个 ref 处创建/分离一个新块。 rx 的变量。下一批数据?这就是 P-C 队列通常用于处理大量数据的方式 - 排队 refs/pointers,而不是实际数据。线程具有共享内存空间(一些开发人员似乎认为这只会导致问题),所以使用它 - 队列指针/引用并将 MB 数据作为一个指针安全地传输。只要您在下一行代码中,总是在将旧线程排队后创建/分离一个新线程,生产者和消费者线程就永远不会在同一个块上运行。

    排队 *chunks 对于大块的效率是 10 倍。

  2. 将 *chunks 发送到快速链接,然后将它们从那里“转发”到慢速链接。

  3. 如果慢速链接不会阻塞您的系统并导致最终的 OOM 错误,您可能需要全面的流量控制。我通常做的是修复总缓冲区大小的“整体”配额,并在启动时创建一个块池(池是另一个 BlockingCollection,在启动时填充 *new(chunks))。生产者线程使块出列,用数据填充它们,将它们排队到 FAST 线程。 FAST 线程处理接收到的块,然后将 *chunk 排队到 SLOW 线程。 SLOW 线程处理相同的数据,然后重新池化“已使用”块以供生产者线程重新使用。这形成了一个流控制系统——如果 SLOW 线程太慢,生产者最终会尝试从一个空池中分离一个 *chunk 并阻塞在那里,直到 SLOW 线程重新池一些使用的 *chunk 并因此通知生产者线程运行再次。您可能需要在慢速线程中使用一些策略来使其操作超时并提前转储它的 *chunk,因此删除数据 - 您必须根据您的整体需求决定一个策略 - 显然不可能将数据连续排队到快速除非缓慢的消费者转储一些数据,否则永远缓慢的消费者不会内存溢出。

编辑 - 哦,是的,使用池消除了对已使用块的 GC,进一步提高了性能。

一个整体的流程策略是不在慢线程中转储任何数据。随着持续的高数据流,*chunks 最终将全部位于快速和慢速线程之间的队列中,并且生产者线程确实会阻塞在空池中。然后,网络连接将应用其自己的流控制来阻止网络对等方通过 TCP 发送更多数据。这将流控制从您的慢线程一直扩展到对等方。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-19
    • 1970-01-01
    • 1970-01-01
    • 2019-10-26
    • 1970-01-01
    • 2015-04-13
    • 2018-03-16
    相关资源
    最近更新 更多