【问题标题】:Is continuation with optional lock subject to a race condition?可选锁的延续是否受到竞争条件的影响?
【发布时间】:2013-01-26 17:44:08
【问题描述】:

我刚刚开始使用任务并行库。所讨论的任务是以尽可能并行的方式处理结果,但要保持结果的顺序。

此外,可以随时添加项目,直到设置标志表示不再接受任何项目。

此外,一旦所有结果完成后,一些客户将需要收到通知(只有在没有更多项目被接受时才会发生这种情况)。

我想出了下面的简化示例,它似乎在我的所有测试中都运行良好。

class Program
{
    static void Main(string[] args)
    {
        for (int i = 1; i < random.Next(5, 21); ++i)
        {
            AddItem(i);
        }

        finishedAddingItems = true;

        completion.Task.Wait();
        Console.WriteLine("Finished");
        Console.ReadKey();
    }

    static TaskCompletionSource<bool> completion = 
                            new TaskCompletionSource<bool>();

    static bool finishedAddingItems = false;

    static Random random = new Random();

    class QueueResult
    {
        public int data;
        public int IsFinished;
    }

    static ConcurrentQueue<QueueResult> queue = 
                           new ConcurrentQueue<QueueResult>();

    static object orderingLockObject = new object();

    static void AddItem(int i)
    {
        var queueItem = new QueueResult { data = i, IsFinished = 0 };

        queue.Enqueue(queueItem);

        Task.Factory
            .StartNew(() => 
            { 
                for (int busy = 0; 
                     busy <= random.Next(9000000, 90000000); 
                     ++busy) 
                { }; 
                Interlocked.Increment(ref queueItem.IsFinished); 
            })
            .ContinueWith(t =>
            {
                QueueResult result;

                //the if check outside the lock is to avoid tying up resources
                //needlessly, since only one continuation can actually process
                //the queue at a time.
                if (queue.TryPeek(out result) 
                    && result.IsFinished == 1)
                {
                    lock (orderingLockObject)
                    {
                        while (queue.TryPeek(out result) 
                               && result.IsFinished == 1)
                        {
                            Console.WriteLine(result.data);
                            queue.TryDequeue(out result);
                        }

                        if (finishedAddingItems && queue.Count == 0)
                        {
                            completion.SetResult(true);
                        }
                    }
                }
            });
    }
}

但是,我无法说服自己是否存在可能无法处理项目的潜在竞争条件?

【问题讨论】:

  • 不,我目前卡在 .NET 4.0
  • @Hans 我 am 使用 ConcurrentQueue。所以我不确定你的评论是否有效。
  • @Hans 另外,如果我正确理解了您对热等待循环的评论,我实际上没有忙于等待,这就是重点。 for (int busy = 0; ... 循环只是为了示例的目的模拟工作。
  • @HansPassant TaskCompletionSource 不会代替 AutoResetEvent 吗?

标签: .net concurrency task-parallel-library


【解决方案1】:

我认为您的代码可能无法正常工作,因为您没有将IsFinished 声明为volatile,而是直接在锁之外访问它。无论如何,正确使用double-checked locking 是很困难的,所以除非你真的必须这样做,否则你不应该这样做。

此外,您的代码也很混乱(将所有内容都放在一个类中,使用int 而不是bool,不必要的ContinueWith(),...)并且至少包含一个线程安全问题(Random不是线程安全的)。

因此,我建议您了解 TPL 更高级的部分。就您而言,PLINQ 听起来是正确的解决方案:

var source = Enumerable.Range(1, random.Next(5, 21)); // or some other collection

var results = source.AsParallel()
                    .AsOrdered()
                    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                    .Select(i => /* perform your work here */);

foreach (int i in results)
    Console.WriteLine(i);

【讨论】:

  • 感谢您的回答。问题在于(除非我误解)这假设我一开始就知道结果的大小。哪个,我不像问题一开始所说的那样。我正在修改示例代码以使其更清晰。
  • @Nathan 在这种情况下,您可以将BlockingCollectionGetConsumingEnumerable() 一起用作PLINQ 查询的源,而不是Range()。换句话说,这并不假设您知道集合的大小,只是假设它是一个IEnumerable。我已经澄清了代码。
  • 您的 PLINQ 示例结合 BlockingCollection 和 GetConsumingEnumerable() 完全符合我的需要,甚至提供了一种指示不再添加结果的方法。更好的是,它的运行效率也比我的第一次代码尝试高得多。
猜你喜欢
  • 1970-01-01
  • 2022-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多