【问题标题】:Mutex when writing to queue held in map for thread safety为线程安全写入映射中保存的队列时的互斥锁
【发布时间】:2026-01-26 15:10:01
【问题描述】:

我有一个map<int, queue<int>>,其中有一个线程写入其中,即将消息推送到队列中。它们的键是指client_id,队列为客户端保存消息。我希望使这个读写线程安全。

目前,写入它的线程会做这样的事情

map<int, queue<int>> msg_map;
if (msg_map.find(client_id) != msg_map.end())
{
    queue<int> dummy_queue;
    dummy_queue.push(msg); //msg is an int
    msg_map.insert(make_pair(client_id, dummy_queue);
}
else
{
    msg_map[client_id].push(msg);
}

有许多客户正在阅读并删除此地图。

if (msg_map.find(client_id) != msg_map.end())
{
    if (!msg_map.find(client_id)->second.empty())
    {
        int msg_rxed = msg_map[client_id].front();

        //processing message

        msg_map[client_id].pop();
    }
}

我正在阅读关于互斥锁的this(以前没有使用过它们),我想知道应该在何时何地锁定互斥锁。我的困惑在于他们正在访问单独的队列(保存在同一个地图中)。我是锁定队列还是锁定地图?

是否有标准/公认的方式来执行此操作 - 并且使用互斥锁是执行此操作的最佳方式?有 0 个客户端线程,只有 1 个单个写入线程。

【问题讨论】:

    标签: c++ stl thread-safety mutex


    【解决方案1】:

    简化和优化您的代码

    现在我们不关心互斥体,稍后我们会在代码稍微清理一下时处理它(那样会更容易)。

    首先,从您展示的代码看来,似乎没有理由使用有序的std::map(对数复杂度),您可以使用效率更高的std::unordered_map(平均恒定时间复杂度)。选择完全取决于您,如果您不需要订购容器,您只需更改其声明:

    std::map<int, std::queue<int>> msg_map;
    // or
    std::unordered_map<int, std::queue<int>> msg_map; // C++11 only though
    

    现在,地图在设计上非常高效,但如果您坚持对每个操作进行查找,那么您将失去地图的所有优势。

    关于编写器线程,所有您的代码块(用于编写器)可以被这一行有效地替换:

    msg_map[client_id].push(msg);
    

    请注意,std::mapstd::unordered_mapoperator[] 定义为:

    使用key 作为键和默认构造的映射值将新元素插入容器,并返回对新构造的映射值的引用。如果具有key 键的元素已经存在,则不执行插入并返回对其映射值的引用。

    关于您的阅读器线程,您不能直接使用operator[],因为如果特定client_id 当前不存在,它将创建一个新条目,因此您需要按顺序缓存find 返回的迭代器重用它,从而避免无用的查找:

    auto iter = msg_map.find(client_id);
    // iter will be either std::map<int, std::queue<int>>::iterator
    //                  or std::unordered_map<int, std::queue<int>>::iterator
    if (iter != msg_map.end()) {
        std::queue<int>& q = iter->second;
        if (!q.empty()) {
            int msg = q.front();
            q.pop();
            // process msg
        }
    }
    

    我之所以在处理它之前立即pop消息,是因为当我们添加互斥锁时它会提高并发性(我们可以更快地解锁互斥锁,这总是好的)。

    使代码线程安全

    @hmjd 关于多个锁(一个用于地图,一个用于每个队列)的想法很有趣,但根据您向我们展示的代码,我不同意:您从额外并发中获得的任何好处很可能会被否定锁定队列互斥体所需的额外时间(实际上,锁定互斥体是一个非常昂贵的操作),更不用说您必须处理的额外代码复杂性。我会把钱押在一个更高效的互斥锁上(同时保护地图和所有队列)。

    顺便说一下,如果您想使用更高效的std::unordered_mapstd::map 不会遇到这个问题),单个互斥锁可以解决迭代器失效问题。

    假设 C++11,只需声明 std::mutex 以及您的地图:

    std::mutex msg_map_mutex;
    std::map<int, std::queue<int>> msg_map; // or std::unordered_map
    

    保护 writer 线程非常简单,只需在访问 map 之前锁定 mutex:

    std::lock_guard<std::mutex> lock(msg_map_mutex);
    // the lock is held while the lock_guard object stays in scope
    msg_map[client_id].push(msg);
    

    保护阅读器线程几乎没有任何困难,唯一的技巧是您可能希望尽快解锁互斥锁以提高并发性,因此您必须使用std::unique_lock(可以提前解锁)代替的std::lock_guard(只有在超出范围时才能解锁):

    std::unique_lock<std::mutex> lock(msg_map_mutex);
    auto iter = msg_map.find(client_id);
    if (iter != msg_map.end()) {
        std::queue<int>& q = iter->second;
        if (!q.empty()) {
            int msg = q.front();
            q.pop();
            // assuming you don't need to access the map from now on, let's unlock
            lock.unlock();
            // process msg, other threads can access the map concurrently
        }
    }
    

    如果您不能使用 C++11,则必须替换 std::mutex 等。与您的平台提供的任何东西(pthreads、Win32、...)或 boost 等效项(与新的 C++11 类一样具有可移植性和易于使用的优点,这与特定于平台的原语不同)。

    【讨论】:

      【解决方案2】:

      mapqueue 的读写访问需要同步,因为这两个结构都在修改,包括map

      map<int, queue<int>> msg_map;
      if (msg_map.find(client_id) != msg_map.end())
      {
          queue<int> dummy_queue;
          dummy_queue.push(msg); //msg is an int
          msg_map.insert(make_pair(client_id, dummy_queue);
      }
      else
      {
          msg_map[client_id].push(msg); // Modified here.
      }
      

      两个选项是mutex,它同时锁定mapqueue,或者对于mapmutex 每个queue 有一个互斥锁。第二种方法更可取,因为它减少了持有单个锁的时间长度,并且意味着多个线程可以同时更新多个队列。

      【讨论】: