【发布时间】:2014-07-03 21:20:16
【问题描述】:
我正在尝试实现一个具有多个生产者和一个消费者的相当简单的生产者/消费者风格的应用程序。
研究让我找到了BlockingCollection<T>,它很有用,让我能够实现一个长期运行的消费者任务,如下所示:
var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);
foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});
ProcessItems 函数将缓冲区提交到数据库,并且它确实分批工作。然而,这个解决方案是次优的。在男爵生产期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过时。
更理想的解决方案是在 30 秒计时器上运行任务,或者将 foreach 短路并超时。
我想到了计时器的想法并想出了这个:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
foreach (var value in collection)
{
buffer.Add(value);
}
ProcessItems(buffer);
buffer.Clear();
}
这有一个明显的问题,foreach 将被阻止直到结束,从而破坏了计时器的目的。
任何人都可以提供一个方向吗?我基本上需要定期对BlockingCollection 进行快照并处理内容以清除它。也许BlockingCollection 是错误的类型?
【问题讨论】:
标签: c# multithreading thread-safety task-parallel-library task