【问题标题】:How to enqueue and dequeue a lot at the same time?如何同时入队和出队?
【发布时间】:2017-10-18 15:10:15
【问题描述】:

我有两个线程的简单场景,其中第一个线程永久读取一些数据并将该数据排入队列。第二个线程首先从该队列中查看单个对象并进行一些条件检查。如果这些都很好,则单个对象将出列并传递给某些处理。

我尝试使用ConcurrentQueue,它是一个简单队列的线程安全实现,但是这个问题是所有调用都被阻塞了。这意味着如果第一个线程将对象入队,则第二个线程无法查看或出队对象。

在我的情况下,我需要同时从队列的开头入队和出队。

C#的lock语句也会。

所以我的问题是是否可以并行执行这两个操作而不会以线程安全的方式相互阻塞。

这是我的第一次尝试,这是我的问题的类似示例。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Scenario {
    public class Program {
        public static void Main(string[] args) {
            Scenario scenario = new Scenario();
            scenario.Start();
            Console.ReadKey();
        }

        public class Scenario {
            public Scenario() {
                someData = new Queue<int>();
            }

            public void Start() {
                Task.Factory.StartNew(firstThread);
                Task.Factory.StartNew(secondThread);
            }

            private void firstThread() {
                Random random = new Random();
                while (true) {
                    int newData = random.Next(1, 100);
                    someData.Enqueue(newData);
                    Console.WriteLine("Enqueued " + newData);
                }
            }

            private void secondThread() {
                Random random = new Random();
                while (true) {
                    if (someData.Count == 0) {
                        continue;
                    }

                    int singleData = someData.Peek();

                    int someValue = random.Next(1, 100);
                    if (singleData > someValue || singleData == 1 || singleData == 99) {
                        singleData = someData.Dequeue();
                        Console.WriteLine("Dequeued " + singleData);
                        // ... processing ...
                    }
                }
            }

            private readonly Queue<int> someData;
        }
    }
}

第二个例子:

public class Scenario {
    public Scenario() {
        someData = new ConcurrentQueue<int>();
    }

    public void Start() {
        Task.Factory.StartNew(firstThread);
        Task.Factory.StartNew(secondThread);
    }

    private void firstThread() {
        Random random = new Random();
        while (true) {
            int newData = random.Next(1, 100);
            someData.Enqueue(newData);
            lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
        }
    }

    private void secondThread() {
        Random random = new Random();
        while (true) {
            if (!someData.TryPeek(out int singleData)) {
                continue;
            }

            int someValue = random.Next(1, 100);
            if (singleData > someValue || singleData == 1 || singleData == 99) {
                if (!someData.TryDequeue(out singleData)) {
                    continue;
                }

                lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }

                // ... processing ...
            }
        }
    }

    private int enqued = 0;
    private int dequed = 0;

    private readonly ConcurrentQueue<int> someData;

    private static readonly object syncRoot = new object();
}

【问题讨论】:

  • 您可以使用第三个线程来处理队列的所有工作,另外两个线程与第三个线程通信,它们不直接使用队列
  • @EhsanZargarErshadi 除了为一个没有任何生产力的额外线程消耗更多系统资源之外,这并没有为您带来任何好处。
  • 为什么同时添加和删除对您很重要?通常,产生或使用该值所需的处理比实际从队列中添加或删除项目所需的时间显着昂贵,因此锁争用往往较低。您在实践中实际上遇到了哪些问题,导致无法接受简单地同步访问队列?
  • @Servy 我不同意。我已经用并发队列和一些计数器重写了我的示例,并且 dequeing 在某个数字处停止,例如 260 次。在那之后,只有 enquing 正在发生。已编辑的帖子。
  • @StudentBanana 您还同步了整个处理过程。这意味着在两者之间实际上没有任何工作可以并行完成。如果您实际上并行处理项目,则同步访问队列不会有问题。

标签: c# .net queue thread-safety nonblocking


【解决方案1】:

首先:我强烈鼓励您重新考虑拥有多个线程和共享内存数据结构的技术是否是正确的方法。具有多个控制线程共享对数据结构的访问权限的代码很难正确处理,而且故障可能是微妙的、灾难性的并且难以调试。

第二:如果您偏爱多线程和共享内存数据结构,我强烈鼓励您使用由专家设计的数据类型,例如并发队列,而不是自行开发。

现在我已经消除了这些警告:这是解决您的问题的一种方法。如果您这样做,您应该获得 C# 内存模型专家的服务来验证您的解决方案的正确性,这已经足够复杂了。如果没有真正是内存模型专家的人的帮助,我不会认为自己有能力实施我将要描述的方案。

目标是拥有一个支持同时入队和出队操作以及低锁争用的队列。

你想要的是两个不可变的堆栈变量,分别称为enqueue堆栈和dequeue堆栈,每个堆栈都有自己的锁。

入队操作是:

  • 获取入队锁
  • 将项目推入入队堆栈;这会在 O(1) 时间内生成一个新堆栈。
  • 将新生成的堆栈分配给入队堆栈变量。
  • 释放入队锁

出队操作是:

  • 获取出队锁
  • 如果出队堆栈为空,则
    • 获取入队锁
    • 枚举入队堆栈并使用它来构建出队堆栈;这反转入队堆栈,它维护了我们想要的属性:先进先出。
    • 将一个空的不可变堆栈分配给入队堆栈变量
    • 释放入队锁
    • 将新堆栈分配给出队堆栈
  • 如果出队堆栈为空,则抛出或放弃并稍后重试,或休眠直到收到入队操作的信号,或任何正确的做法。
  • 出队堆栈不为空。
  • 从出队堆栈中弹出一个项目,这会在 O(1) 中生成一个新堆栈。
  • 将新堆栈分配给出队堆栈变量。
  • 释放出队锁。
  • 处理项目。

请注意,当然如果只有一个线程出队,那么我们根本不需要出队锁,但是使用这种方案可以有很多线程出队。

假设入队堆栈上有 1000 个项目,而出队堆栈上有 0 个项目。当我们第一次出队时,我们做了一个昂贵的 O(n) 操作来反转入队堆栈一次,但现在我们在出队堆栈上有 1000 个项目。一旦出队堆栈很大,出队线程可以将大部分时间用于处理,而入队线程将大部分时间用于入队。入队锁的争用很少发生,但发生时代价高昂

为什么要使用不可变数据结构?我在这里描述的所有内容也适用于可变堆栈,但是(1)更容易推断不可变堆栈,(2)如果你想真正危险地生活,你可以省略一些锁并进行互锁交换操作;如果您这样做,请确保您了解所有关于在低锁定条件下可能的操作重新排序。

更新:

真正的问题是我不能出列和处理很多点,因为我一直在读取和获取新点。入队调用阻塞了处理步骤。

如果这是你真正的问题,那么在问题中提及而不是将其埋在评论中是个好主意。帮助我们帮助您。

您可以在这里做很多事情。例如,您可以将入队线程的优先级设置为低于出队线程的优先级。或者您可以有多个出队线程,与您机器中的 CPU 数量一样多。或者,如果出队没有跟上,您可以动态选择删除一些入队操作。如果对您的实际问题有更多了解,就很难就如何解决它提出建议。

【讨论】:

  • 不错的答案。使用两个不可变队列而不是两个不可变堆栈不是更好吗?这样,从写队列到读队列的移动可以在 O(1) 最坏的情况下执行。
  • @Fede:这是个好主意,但如果它是一个持久不可变队列,那么它可能实现为两个不可变堆栈,所以你所做的就是解决问题。我描述这样的机制的目的是要对 几乎总是在某个地方进行昂贵的操作这一事实提供一些见解;例如,可变队列使用 double-when-full 算法,这也会产生一个偶然的 O(n) 步,该步会在 O(n) 便宜步上摊销。
  • 通常我们认为昂贵的步骤是不利的,但在这种情况下,我们可以利用它们来发挥优势,因为我们将许多频繁的小争用减速转变为不经常的大减速,这通常是一个净赢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-14
  • 1970-01-01
  • 2013-07-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多