【问题标题】:Why does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection为什么迭代 GetConsumingEnumerable() 不能完全清空底层阻塞集合
【发布时间】:2012-04-29 19:14:51
【问题描述】:

我在尝试创建简单管道时使用任务并行库BlockingCollection<T>ConcurrentQueue<T>GetConsumingEnumerable 遇到了一个可量化且可重复的问题。

简而言之,将条目添加到来自一个线程的默认 BlockingCollection<T>(在底层依赖于 ConcurrentQueue<T>)并不能保证它们会从另一个调用GetConsumingEnumerable() 方法。

我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只是将整数打印到屏幕上。

  • Timer1 负责对工作项进行排队...它使用一个名为 _tracker 的并发字典,以便知道它已经添加到阻塞集合中的内容。
  • Timer2 只是记录BlockingCollection_tracker 的计数状态
  • START 按钮启动 Paralell.ForEach,它简单地遍历阻塞集合 GetConsumingEnumerable() 并开始将它们打印到第二个列表框。
  • STOP 按钮停止Timer1,防止将更多条目添加到阻塞集合中。
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

这是事件的顺序:

  • 按开始
  • Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 0、1、2)
  • ListBox2 随后更新了 3 条消息,间隔 1 秒
    • 处理0
    • 处理 1
    • 处理 2
  • Timer1 滴答声 & ListBox1 立即更新 3 条消息(添加 3、4、5)
  • ListBox2 用 2 条消息随后更新,间隔 1 秒
    • 处理 3
    • 处理4
    • Processing 5 未打印...似乎已“丢失”
  • 按 STOP 以防止计时器 1 添加更多消息
  • 等等...“正在处理 5”仍然没有出现

您可以看到并发字典仍在跟踪 1 项尚未处理并随后从 _tracker 中删除

如果我再次按 Start,则 timer1 开始添加更多 3 个条目,并行循环恢复活力,打印 5、6、7 和 8。

我完全不知道为什么会发生这种情况。再次调用 start 显然调用了一个 newtask,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable(),它神奇地找到了丢失的条目......我

为什么BlockingCollection.GetConsumingEnumerable() 不能保证遍历添加到集合中的每个项目。

为什么添加更多条目随后会导致它“解开”并继续处理?

【问题讨论】:

标签: c# .net wpf task-parallel-library blockingcollection


【解决方案1】:

从 .net 4.5 开始,您可以创建一个一次只占用 1 个项目的分区器:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx

【讨论】:

  • 我确认,从 .NET 4.5 开始,这是首选选项。
  • 接受的答案是有用的。这似乎是最新的。
【解决方案2】:

为了清楚起见,我觉得我应该注意,如果您能够在执行 Parallel.foreach 之前调用 BlockingCollection 的 .CompleteAdding() 方法,那么您在上面描述的问题将不成问题。我已经多次将这两个对象一起使用,效果很好。

此外,您可以在调用 CompleteAdding() 后随时重新设置 BlockingCollection,以便在需要时添加更多项目 (_entries = new BlockingCollection();)

如果您多次单击开始和停止按钮,将上面的点击事件代码更改如下将解决您缺少条目的问题并使其按预期工作:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}

【讨论】:

    【解决方案3】:

    您不能在Parallel.ForEach() 中使用GetConsumingEnumerable()

    使用TPL extras中的GetConsumingPartitioner

    在博文中你也会得到解释为什么不能使用GetConsumingEnumerable()

    Parallel.ForEach 和 PLINQ 默认采用的分区算法使用分块来最小化同步成本:不是每个元素获取一次锁,而是获取锁,获取一组元素(一个块) ,然后释放锁。

    即Parallel.ForEach 等到它收到一组工作项后再继续。正是您的实验所显示的。

    【讨论】:

    • TPL extras 在 MS-LPL 许可下,这意味着如果您使用它们,您会将整个衍生作品锁定到 Windows。它不是 OSI 批准的许可证...
    • @Daniel,很高兴知道。感谢更新。您是否碰巧知道非 Windows TPL 是否也默认使用分组分区器
    【解决方案4】:

    我无法使用简单的控制台应用程序复制您的行为,而该应用程序基本上做同样的事情(在 .Net 4.5 beta 上运行,这可能会有所作为)。但我认为发生这种情况的原因是Parallel.ForEach() 试图通过将输入集合分成块来优化执行。并且使用您的可枚举,在您向集合中添加更多项目之前,无法创建一个块。如需更多信息,请参阅Custom Partitioners for PLINQ and TPL on MSDN

    要解决此问题,请勿使用Parallel.ForEach()。如果您仍想并行处理项目,您可以在每次迭代中启动Task

    【讨论】:

    • 感谢您的检查。 parallel.foreach 提供的“不错”功能之一是能够为我限制 MaxDegreesOfParallelism(现实世界的版本是调用 WCF 服务)。如果我只是在每次迭代中通过正常的 foreach 更新任务,你会建议我如何限制并发任务的最大数量。
    猜你喜欢
    • 1970-01-01
    • 2012-11-20
    • 1970-01-01
    • 1970-01-01
    • 2012-10-19
    • 1970-01-01
    • 2011-02-05
    • 2017-01-26
    • 2014-01-02
    相关资源
    最近更新 更多