【发布时间】:2012-06-29 05:04:49
【问题描述】:
这是我的场景。我从外部数据源获取大量数据,我必须在两个地方本地写入。其中一个目的地的写入速度非常慢,但另一个目的地非常快(但我不能依靠它来读取和写入缓慢的目的地)。为此,我使用了生产者-消费者模式(使用 BlockingCollection)。
我现在遇到的问题是我必须在两个 BlockingCollection 中对数据进行排队,这会占用太多内存。我的代码看起来与下面的示例非常相似,但我真的很想从一个队列中驱动两个任务。有谁知道这样做的正确方法是什么?下面的代码有什么低效率的地方吗?
class Program
{
const int MaxNumberOfWorkItems = 15;
static BlockingCollection<int> slowBC = new BlockingCollection<int>(MaxNumberOfWorkItems);
static BlockingCollection<int> fastBC = new BlockingCollection<int>(MaxNumberOfWorkItems);
static void Main(string[] args)
{
Task slowTask = Task.Factory.StartNew(() =>
{
foreach (var item in slowBC.GetConsumingEnumerable())
{
Console.WriteLine("SLOW -> " + item);
Thread.Sleep(25);
}
});
Task fastTask = Task.Factory.StartNew(() =>
{
foreach (var item in fastBC.GetConsumingEnumerable())
{
Console.WriteLine("FAST -> " + item);
}
});
// Population two BlockingCollections with the same data. How can I have a single collection?
for (int i = 0; i < 100; i++)
{
while (slowBC.TryAdd(i) == false)
{
Console.WriteLine("Wait for slowBC...");
}
while (fastBC.TryAdd(i) == false)
{
Console.WriteLine("Wait for 2...");
}
}
slowBC.CompleteAdding();
fastBC.CompleteAdding();
Task.WaitAll(slowTask, fastTask);
Console.ReadLine();
}
}
【问题讨论】:
-
不是一个真正的答案,而是 FWIW,另一种方法是使用 TPL DataFlow,您可以将 BroadcastBlock 发送到 2 个不同的 ActionBlock,用于 2 个不同的写入路径。这种方法的主要好处是不必手动管理“链接”集合。
-
同样,当然,你可以使用 Rx,创建一个 observable,并拥有 2 个订阅者 AFAICT。
-
这是两个非常好的主意,但我必须坚持使用 .NET 4.0(并且没有 RX)中可用的内容...... :(
-
您在运行时阻塞行为方面的目标是什么?例如,考虑到速度差异,另一种选择是只使用那个 foreach 并让它的主体先写入快的,然后再写入慢的。这会很好,还是让速度快的人“跑在前面”很重要?
-
一个简单的解决方案是为每个项目启动两个任务。这意味着您将在物品可用时尽快使用它们。另一种选择是同步更新缓存并将数据库更新作为任务启动。这将在缓存允许的情况下以最快的速度消耗项目。
标签: c# .net multithreading task-parallel-library