【问题标题】:Are there any concurrent containers in C++11? [closed]C++11 中有并发容器吗? [关闭]
【发布时间】:2011-12-10 16:02:44
【问题描述】:

特别是,我正在寻找一个阻塞队列。 C ++ 11中有这样的事情吗?如果没有,我的其他选择是什么?我真的不想再自己下降到线程级别了。太容易出错了。

【问题讨论】:

  • +1,有趣的 Q.Scott Meyers 在 C++0x 天问过这个问题here。知道 C++11 之后这发生了怎样的变化会很有趣。
  • 使用原语很容易将标准队列变成阻塞队列

标签: c++ visual-studio-2010 concurrency c++11 blockingqueue


【解决方案1】:

According to Diego Dagum from Microsoft's Visual C++ Team:

一个经常出现的问题(嗯,是众多问题之一)是关于 STL 容器的 以及它们是否是线程安全的。

以斯蒂芬的话来说,事实是他们不是,不是作为一个 错误,但作为一个特性:拥有每个 STL 的每个成员函数 容器获取内部锁会破坏性能。作为 一个通用的、高度可重用的库,它实际上不会 提供正确性:放置锁的正确级别是 由程序正在做什么决定。从这个意义上说,个人 成员函数往往不是那么正确的级别。

The Parallel Patterns Library (PPL) 包括几个容器,它们提供对其元素的线程安全访问:

  • concurrent_vector Class 是一个允许随机访问任何元素的序列容器类。它支持并发安全追加、元素访问、迭代器访问和迭代器遍历操作。
  • concurrent_queue Class 是一个序列容器类,它允许对其元素进行先进先出的访问。它支持一组有限的并发安全操作,例如 push 和 try_pop 等等。

一些样本here

也很有趣:http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

【讨论】:

  • 是的。但问题是 - 它仅适用于 Windows =(
  • 我不认为 concurrent_queue 类是严格的 FIFO:"在这里,并发安全意味着指针或迭代器总是有效的。它不是元素初始化或特定遍历的保证订单。” docs.microsoft.com/en-us/cpp/parallel/concrt/…
【解决方案2】:

我很惊讶没有人提到moodycamel::ConcurrentQueue。我们使用它已经有一段时间了,它的表现非常好。特别是它的实现是无锁的,立即带来了巨大的速度。其他使用原因(引自官网):

C++ 中没有多少成熟的无锁队列。促进 有一个,但仅限于具有简单赋值运算符的对象 例如,微不足道的析构函数。英特尔的 TBB 队列不是 无锁,并且也需要简单的构造函数。有很多 在 C++ 中实现无锁队列的学术论文,但可用 源代码很难找到,测试更是如此。

hereherehere 提供了一些基准和比较。

警告:在多个生产者的情况下,弹出元素的顺序不能保证与推送元素的顺序相同(@IgorLevicki),因此如果您需要此保证,请寻找其他选项。

【讨论】:

  • moodycamel 实现的问题在于它不是先进先出(即弹出元素的顺序不保证与推送元素的顺序相同),所以它不是一个通用的解决方案。跨度>
  • @IgorLevicki 是的,你是对的。提供的唯一保证是来自同一生产者的元素将保持其相对顺序。
【解决方案3】:

C++11 中没有并发容器。

但以下头类使用 std::deque 提供并发队列、堆栈和优先级容器。

BlockingCollection 是一个 C++11 线程安全集合类,它以 .NET BlockingCollection 类为模型。

【讨论】:

    【解决方案4】:

    我的并发无序地图版本 命名空间并发 {

    template<typename T,typename T1>
    class unordered_bucket: private std::unordered_map<T,T1>
    {
    mutable std::recursive_mutex m_mutex;
    
    public:
    T1 &operator [](T a)
    {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
        return std::unordered_map<T,T1>::operator [](a);
    }
    
    size_t size() const noexcept {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
        return  std::unordered_map<T,T1>::size();
    }
    
    vector<pair<T,T1>> toVector() const
    {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
    
        vector<pair<T,T1>> ret;
        for(const pair<T,T1> &p:*this)
        {
            ret.push_back(p);
        }
        return ret;
    }
    
    bool find(const T &t) const
    {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
        if(this->std::unordered_map<T,T1>::find(t) == this->end())
            return false;  //not found
        return true;
    }
    void erase()
    {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
        this->unordered_map<T,T1>::erase(this->begin(),this->end());
    }
    void erase(const T &t)
    {
        std::lock_guard<std::recursive_mutex> l(m_mutex);
        this->unordered_map<T,T1>::erase(t);
    }
    };
    
    #define BUCKETCOUNT 10
    template<typename T,typename T1>
    class ConcurrentMap
    {
    std::vector<unordered_bucket<T,T1>> m_v;
    public:
    ConcurrentMap():m_v(BUCKETCOUNT){}   //using 10 buckets
    
    T1 &operator [](T a)
    {
        std::hash<T> h;
        return m_v[h(a)%BUCKETCOUNT][a];
    }
    
    size_t size() const noexcept {
        size_t cnt=0;
    
        for(const unordered_bucket<T,T1> &ub:m_v)
            cnt=cnt+ub.size();
    
        return  cnt;
    }
    
    vector<pair<T,T1>> toVector() const
    {
        vector<pair<T,T1>> ret;
        for(const unordered_bucket<T,T1> &u:m_v)
        {
            const vector<pair<T,T1>> &data=u.toVector();
            ret.insert(ret.end(),data.begin(),data.end());
        }
        return ret;
    }
    
    bool find(const T &t) const
    {
        for(const unordered_bucket<T,T1> &u:m_v)
            if(true == u.find(t))
                return true;
        return false;
    }
    void erase()
    {
        for(unordered_bucket<T,T1> &u:m_v)
            u.erase();
    }
    void erase(const T &t)
    {
        std::hash<T> h;
        unordered_bucket<T,T1> &ub = m_v[h(t)%BUCKETCOUNT];
        ub.erase(t);
    }
    };
    }
    

    【讨论】:

      【解决方案5】:

      C++11 本身不提供并发容器。但是,有库选项。 除了已经提到的 PPL,别忘了英特尔 TBB 库。

      它有一个并发的queuehash_mapsetvector 实现。但它不仅是一个线程安全的容器库,它还附带标准算法的并行版本(for-loop、reduce、sort...)。

      Intel TBB website

      【讨论】:

      • 可以给我并发集的链接吗?
      【解决方案6】:

      容器的界面根本就不是为了这个目标而设计的。对于他们使用的接口,客户端可见的锁确实是您可以在保证正确性和可预测行为的同时完成此操作的唯一方法。这也将非常低效,因为收购的数量会非常高(相对于良好的实施而言)。

      解决方案 1

      按值传递(如果适用)。

      解决方案 2

      创建一组简单的附加实现,您可以使用这些实现在持有范围锁的同时传递容器(将其视为伪 c++):

      template <typename TCollection>
      class t_locked_collection {
      public:
          t_locked_collection(TCollection& inCollection, t_lock& lock) : collection(inCollection), d_lock(lock), d_nocopy() {
          }
      
          TCollection& collection;
          // your convenience stuff
      private:
          t_scope_lock d_lock;
          t_nocopy d_nocopy;
      };
      

      然后调用者将锁与集合配对,然后您更新您的接口以在适当的情况下使用(传递)容器类型。这只是一个穷人的类扩展。

      这个锁定的容器是一个简单的例子,还有其他一些变体。这是我选择的路线,因为它确实允许您使用最适合您的程序的粒度级别,即使它不像锁定方法那样透明(在语法上)。调整现有程序也相对容易。与带有内部锁的集合不同,至少它的行为方式是可预测的。

      另一种变体是:

      template <typename TCollection>
      class t_lockable_collection {
      public:
      // ...
      private:
          TCollection d_collection;
          t_mutex d_mutex;
      };
      
      // example:
      typedef t_lockable_collection<std::vector<int> > t_lockable_int_vector;
      

      ...类似于t_locked_collection 的类型可用于公开底层集合。并不是说这种方法是万无一失的,只是防傻。

      【讨论】:

      • 使用“按值传递”,因为您的意思是按值传递完整容器以创建副本并处理副本?还是按值传递容器的项目?请详细说明。
      • @steffen 按值传递容器的元素。考虑到许多容器采用的接口,这(解决方案 1)远非最佳解决方案。除非您愿意编写大量异常处理并使用大量共享指针,否则该方法也几乎没有用处。
      猜你喜欢
      • 2013-01-23
      • 1970-01-01
      • 2023-03-23
      • 1970-01-01
      • 2010-09-17
      • 2010-10-09
      • 1970-01-01
      相关资源
      最近更新 更多