【发布时间】:2012-04-29 19:14:51
【问题描述】:
我在尝试创建简单管道时使用任务并行库BlockingCollection<T>、ConcurrentQueue<T> 和GetConsumingEnumerable 遇到了一个可量化且可重复的问题。
简而言之,将条目添加到来自一个线程的默认 BlockingCollection<T>(在底层依赖于 ConcurrentQueue<T>)并不能保证它们会从另一个调用GetConsumingEnumerable() 方法。
我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只是将整数打印到屏幕上。
-
Timer1负责对工作项进行排队...它使用一个名为_tracker的并发字典,以便知道它已经添加到阻塞集合中的内容。 -
Timer2只是记录BlockingCollection和_tracker的计数状态 - START 按钮启动
Paralell.ForEach,它简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。 - STOP 按钮停止
Timer1,防止将更多条目添加到阻塞集合中。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
这是事件的顺序:
- 按开始
- Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 0、1、2)
- ListBox2 随后更新了 3 条消息,间隔 1 秒
- 处理0
- 处理 1
- 处理 2
- Timer1 滴答声 & ListBox1 立即更新 3 条消息(添加 3、4、5)
- ListBox2 用 2 条消息随后更新,间隔 1 秒
- 处理 3
- 处理4
- Processing 5 未打印...似乎已“丢失”
- 按 STOP 以防止计时器 1 添加更多消息
- 等等...“正在处理 5”仍然没有出现
您可以看到并发字典仍在跟踪 1 项尚未处理并随后从 _tracker 中删除
如果我再次按 Start,则 timer1 开始添加更多 3 个条目,并行循环恢复活力,打印 5、6、7 和 8。
我完全不知道为什么会发生这种情况。再次调用 start 显然调用了一个 newtask,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable(),它神奇地找到了丢失的条目......我
为什么BlockingCollection.GetConsumingEnumerable() 不能保证遍历添加到集合中的每个项目。
为什么添加更多条目随后会导致它“解开”并继续处理?
【问题讨论】:
-
谢谢大家。你们都为我指明了正确的方向,这使我朝着这个方向前进。 @Svick 这可能就是为什么它在 .net4.5 beta connect.microsoft.com/VisualStudio/feedback/details/674705/… 中不是问题的原因,MS Parallel 团队的 steven toup 实际上已经在博客上讨论了这个问题。 blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
标签: c# .net wpf task-parallel-library blockingcollection