【问题标题】:Mutual Exclusion with Multiple Producers and One Consumer多生产者一消费者互斥
【发布时间】:2026-02-02 05:05:01
【问题描述】:

我有一个有趣的问题需要解决一些生产代码。我们目前正在开发一种 Web 服务,该服务将从许多不同的应用程序中调用,本质上将用于发送电子邮件。每当发送新电子邮件时,我们最终都需要将该电子邮件的收据添加到数据库中,但理想情况下,我们不希望立即执行此操作,因此我们将随着时间的推移建立缓冲区。一旦缓冲区达到一定长度,或者经过足够长的时间后,缓冲区的内容将被刷新到数据库中。

这样想,当一个线程发送电子邮件时,它会锁定缓冲区,以便在不受干扰的情况下添加它的日志并维护线程安全。如果它看到缓冲区有一定的大小(在这个例子中我们会说 1000),那么线程有责任将它全部写入数据库(我认为这是低效的,但我使用 Service Stack 作为我们的 web框架,所以如果有办法委派这项任务,我宁愿采用这种方法)。

现在,由于写入数据库可能很耗时,我们希望添加一个辅助缓冲区以供使用。因此,一旦一个缓冲区已满,所有新请求都会在第一个缓冲区被刷新时将它们的工作记录到第二个缓冲区中。同样,一旦第二个缓冲区已满,所有线程将移回第一个缓冲区,第二个缓冲区将被刷新。

我们需要解决的主要问题:

  • 当一个线程决定它需要刷新其中一个缓冲区时,它需要指示所有新线程开始记录到第二个缓冲区(这应该像更改某些变量或指针以指向空缓冲区一样简单)
  • 如果在临界区的当前用户决定刷新日志时当前有线程阻塞,则需要重新激活所有阻塞线程并将它们指向第二个缓冲区

我更关心第二个要点。重新唤醒所有阻塞线程的最佳方法是什么,而不是让它们进入第一个缓冲区的临界区,而是让它们尝试为空的线程获得锁?

编辑

根据下面的 cmets,我想出了一些我认为可行的方法。我不知道存在线程安全数据结构。

    private readonly ConcurrentQueue<EmailResponse> _logBuffer = new ConcurrentQueue<EmailResponse>();
    private readonly object _lockobject = new object();
    private const int BufferThreshold = 1000;

    public void AddToBuffer(EmailResponse email)
    {
        _logBuffer.Enqueue(email);

        Monitor.Enter(_lockobject);
        if (_logBuffer.Count >= BufferThreshold)
            Task.Run(async () =>
            {
                EmailResponse response;
                for (var i = 0; i < BufferThreshold; i++)
                    if (_logBuffer.TryDequeue(out response))
                        await AddMail(response);
                Monitor.Exit(_lockobject);
            });
        else Monitor.Exit(_lockobject);
    }

【问题讨论】:

    标签: c# multithreading web-services mutual-exclusion


    【解决方案1】:

    我不确定您是否需要第二个缓冲区; ConcurrentQueue 让我觉得是解决您问题的好方法。每个线程都可以在没有冲突的情况下入队,如果任何线程注意到队列的 Count 高于魔法阈值,即使有更多线程入队,您也可以安全地出队最多那么多对象。

    我制作的一个(非常快速和肮脏的)工作样本:

    static class Buffer
    {
        private const int c_MagicThreshold = 10;
        private static ConcurrentQueue<string> s_Messages = new ConcurrentQueue<string>();
        private static object s_LockObj = new object();
    
        public static void Enqueue(string message)
        {
            s_Messages.Enqueue(message);
            // try to flush every time; spawn on a non-blocking thread and immediately return
            new Task(Flush).Start();
        }
    
        public static void Flush()
        {
            // do we flush at all?
            if (s_Messages.Count >= c_MagicThreshold)
            {
                lock (s_LockObj)
                {
                    // make sure another thread didn't flush while we were waiting
                    if (s_Messages.Count >= c_MagicThreshold)
                    {
                        List<string> messages = new List<string>();
                        Console.WriteLine("Flushing " + c_MagicThreshold + " messages...");
                        for (int i = 0; i < c_MagicThreshold; i++)
                        {
                            string message;
                            if (!s_Messages.TryDequeue(out message))
                            {
                                throw new InvalidOperationException("How the hell did you manage that?");
                                // or just break from the loop if you don't care much, you spaz
                            }
                            messages.Add(message);
                        }
                        Console.WriteLine("[ " + String.Join(", ", messages) + " ]");
    
                        // number of new messages enqueued between threshold pass and now
                        Console.WriteLine(s_Messages.Count + " messages remaining in queue");
                    }
                }
            }
        }
    }
    

    测试调用:

    Parallel.For(0, 30, (i) =>
    {
        Thread.Sleep(100);  // do other things
        Buffer.Enqueue(i.ToString());
    });
    

    测试运行的控制台输出:

    正在刷新 10 条消息...

    [ 28, 21, 14, 0, 7, 29, 8, 15, 1, 22 ]

    队列中剩余 5 条消息

    正在刷新 10 条消息...

    [16、3、9、2、23、17、10、4、24、5]

    队列中剩余 1 条消息

    正在刷新 10 条消息...

    [11、18、25、19、26、12、6、20、13、27]

    0 条消息留在队列中

    【讨论】:

    • 如果清空需要时间,您可能需要注意是否有多个线程注意到队列高于魔法阈值,否则这似乎是一个不错的实现选择。 +l
    • 如果是这种情况,那么您可以使用基本对象并与该共享对象建立锁定来刷新缓冲区。从那时起,您可以假设如果缓冲区实际上被锁定,则它正在被刷新,然后继续。否则,您将继续沿着锁定路径前进。
    • @SamHolder 没有提到,但是是的,绝对需要考虑这种极端情况。使用示例实现编辑了我的答案,该示例实现检查锁的两侧以解决该问题。
    • 我真的很喜欢这个解决方案!我看到的唯一可能是一个问题是,如果刷新需要很长时间,您可能会遇到一些相当多的线程都看到这个并在刷新时锁定的问题。当然,一旦他们进入,他们就会看到缓冲区被清除并且线程将死亡,但这仍然可能对服务器造成压力,具体取决于创建和阻塞了多少“刷新”线程
    • @DillonDrobena 是的,尽管也有多种解决方法。例如,您可以在进入锁定之后将私有布尔值设置为 true,在退出之前将其设置为 false,然后在尝试锁定之前检查该布尔值是否为 false。这样,线程就有很好的机会看到锁已启用并立即退出,而不是尝试锁定自己。
    【解决方案2】:

    你能给每个线程一个对象来保存两个缓冲区并让线程记录到这个对象吗?当每个线程要求它记录某些内容时,该对象将决定写入哪个缓冲区。该对象还可能负责清空数据库的完整缓冲区,而不是阻止线程写入。

    【讨论】:

      最近更新 更多