【问题标题】:Producer/Consumer pattern with a batched producer具有批量生产者的生产者/消费者模式
【发布时间】:2014-07-03 21:20:16
【问题描述】:

我正在尝试实现一个具有多个生产者和一个消费者的相当简单的生产者/消费者风格的应用程序。

研究让我找到了BlockingCollection<T>,它很有用,让我能够实现一个长期运行的消费者任务,如下所示:

var c1 = Task.Factory.StartNew(() =>
{
    var buffer = new List<int>(BATCH_BUFFER_SIZE);

    foreach (var value in blockingCollection.GetConsumingEnumerable())
    {
        buffer.Add(value);
        if (buffer.Count == BATCH_BUFFER_SIZE)
        {
            ProcessItems(buffer);
            buffer.Clear();
        }
    }
});

ProcessItems 函数将缓冲区提交到数据库,并且它确实分批工作。然而,这个解决方案是次优的。在男爵生产期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过时。

更理想的解决方案是在 30 秒计时器上运行任务,或者将 foreach 短路并超时。

我想到了计时器的想法并想出了这个:

syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);

private static void TimerElapsed(object state)
{
    var buffer = new List<int>();
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();

    foreach (var value in collection)
    {
        buffer.Add(value);
    }

    ProcessItems(buffer);
    buffer.Clear();
}

这有一个明显的问题,foreach 将被阻止直到结束,从而破坏了计时器的目的。

任何人都可以提供一个方向吗?我基本上需要定期对BlockingCollection 进行快照并处理内容以清除它。也许BlockingCollection 是错误的类型?

【问题讨论】:

    标签: c# multithreading thread-safety task-parallel-library task


    【解决方案1】:

    不要在计时器回调中使用GetConsumingEnumerable,而是使用其中一种方法,将结果添加到列表中,直到它返回false,或者您已经达到令人满意的批量大小。

    BlockingCollection.TryTake Method (T) - 可能是您需要的,您根本不想再等待。

    BlockingCollection.TryTake Method (T, Int32)

    BlockingCollection.TryTake Method (T, TimeSpan)

    您可以轻松地将其提取到扩展中(未经测试):

    public static IList<T> Flush<T>
    (this BlockingCollection<T> collection, int maxSize = int.MaxValue)
    {
         // Argument checking.
    
         T next;
         var result = new List<T>();
    
         while(result.Count < maxSize && collection.TryTake(out next))
         {
             result.Add(next);
         }
    
         return result;
    }
    

    【讨论】:

    • 我选择了这个完美运行的实现。它每 x 秒运行一次,并且只推送可用的内容(无需等待)。谢谢你的帮助。 gist.github.com/AntSwift/d2faa4f43d1a6d594172
    • 现在可以在github.com/AntSwift/AS.BlockingCollectionDemo获得完整的工作版本
    • 我相信您的代码示例会由于计时器机制而受到竞争条件的影响。如果您的消费者无法在定时器周期内完成任务,定时器将触发,而不管是否会产生另一个消费者。
    • @Andrew:这取决于使用的 Timer 实现,这超出了这个答案的范围,这只是关于阻塞集合本身。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-12
    • 2011-10-27
    相关资源
    最近更新 更多