【问题标题】:How do I extend C++ boost list container to implement a thread safe implementation using boost upgrade mutex?如何扩展 C++ 升压列表容器以使用升压升级互斥体实现线程安全实现?
【发布时间】:2023-12-20 18:05:01
【问题描述】:

我编写了一些示例测试代码来验证使用 boost 升级互斥锁在 boost 列表容器上实现读/写互斥锁的功能。我有十个线程,5 个读者,5 个作者。
我使用智能指针来简化内存管理并允许同一对象包含在多个列表中。 编写者不断地删除对象并将其重新插入到各自的列表中,而读者则定期迭代列表。 这一切似乎都按预期工作,但是当调用列表擦除成员函数时,当我已经拥有它时,它必须找到要删除的条目。
擦除方法是否足够聪明,可以知道要擦除的条目而无需再次搜索,或者它是否经过优化以在列表元素已知时消除搜索?如果它进行搜索,那么是否有一种直接的方法来扩展它,以便只能在从列表中实际删除时应用唯一锁,而不是在查找列表元素时应用? 这是我与 boost 1.51 库链接并使用 vs2008 测试的代码。

  //******************************************************************************
  // INCLUDE FILES
  //******************************************************************************
  #include <boost/thread.hpp>
  #include <boost/date_time.hpp>
  #include <boost/thread/locks.hpp>  
  #include <boost/thread/shared_mutex.hpp>                  
  #include <boost/container/list.hpp>      
  #include <boost/shared_ptr.hpp>
  #include <boost/make_shared.hpp>
  #include <iostream>

  using namespace std;

  //******************************************************************************
  // LOCAL DEFINES
  //******************************************************************************
  #define NUM_THREADS  10
  #define NUM_WIDTH    5

  #ifdef UNIQUE_MUTEX
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_intListMutex
  #define FLOAT_LIST_MUTEX          g_floatListMutex
  #else
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_listMutex
  #define FLOAT_LIST_MUTEX          g_listMutex
  #endif

  //******************************************************************************
  // LOCAL TYPEDEFS
  //******************************************************************************
  typedef boost::upgrade_mutex                   myMutex;
  typedef boost::shared_lock<myMutex>            SharedLock;
  typedef boost::upgrade_to_unique_lock<myMutex> UniqueLock;
  typedef boost::upgrade_lock<myMutex>           UpgradeLock;
  class myDataIntf;
  typedef boost::shared_ptr<myDataIntf>          myDataIntfPtr;
  typedef boost::container::list<myDataIntfPtr>  myList;

  //******************************************************************************
  // LOCAL CLASS DECLARATIONS
  //******************************************************************************
  class myDataIntf
  {
  public:
     virtual char* getDataType(void) = 0;
  };

  class intData : public myDataIntf
  {
  private:
     int  data;

  public:
     intData(int new_data) : data(new_data){};
     ~intData(void)
     {
        extern int instIntDeletes;

        instIntDeletes++;
     };
     char* getDataType(void)
     {
        return "Int";
     }
     int getData(void)
     {
        return data;
     }
     void setData(int new_data)
     {
        data = new_data;
     }
  };

  class floatData : public myDataIntf
  {
  private:
     float  data;

  public:
     floatData(float new_data) : data(new_data){};
     ~floatData(void)
     {
        extern int instFloatDeletes;

        instFloatDeletes++;
     };
     char* getDataType(void)
     {
        return "Float";
     }
     float getData(void)
     {
        return data;
     }
     void setData(float new_data)
     {
        data = new_data;
     }
  };

  //******************************************************************************
  // LOCALLY DEFINED GLOBAL DATA
  //******************************************************************************

  // Define one mutex per linked list
  myMutex  g_listMutex;
  myMutex  g_intListMutex;
  myMutex  g_floatListMutex;

  int instReadFloatCount[NUM_THREADS];
  int instWriteFloatCount[NUM_THREADS];
  int instReadIntCount[NUM_THREADS];
  int instWriteIntCount[NUM_THREADS];
  int instFloatDeletes = 0;
  int instIntDeletes = 0;

  //******************************************************************************
  // Worker Thread function
  //******************************************************************************
  void workerFunc(int inst, myList* assigned_list, myMutex*  mutex)
  {
     boost::posix_time::millisec workTime(1*inst);
     myList::iterator            i;
     int                         add_delay = 0;
     int                         add_f_count = 0;
     int                         add_i_count = 0;

     instReadFloatCount[inst] = 0;
     instReadIntCount[inst] = 0;
     instWriteIntCount[inst] = 0;
     instWriteFloatCount[inst] = 0;

     mutex->lock();
     cout << "Worker " << inst << ": ";
     for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
     {
        cout << (*i)->getDataType();
        if ( 0 == strcmp("Float", (*i)->getDataType() ) )
        {
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData() << " ";
        }
        if ( 0 == strcmp("Int", (*i)->getDataType() ) )
        {
           intData*  f = (intData*)i->get();
           cout << " " << f->getData() << " ";
        }
     }
     cout << endl;
     mutex->unlock();

     // Do some work for 10 seconds.
     for ( int tick = 0; tick < 10000/(1*inst+1); tick++)
     {
        add_delay++;
        boost::this_thread::sleep(workTime);
        if ( inst < (NUM_THREADS/2) )
        {
           // reader - Get a shared lock that allows multiple readers to
           // access the linked list. Upgrade locks act as shared locks
           // until converted to unique locks, at which point the 
           // thread converting to the unique lock will block until
           // all existing readers are done.  New readers will wait
           // after the unique lock is released.
           SharedLock shared_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*  f = (floatData*)i->get();
                 instReadFloatCount[inst]++;
              }
              if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*  f = (intData*)i->get();
                 instReadIntCount[inst]++;
              }
           }
        }
        else
        {
           // writer - get the upgrade lock that will allow us
           // to make multiple modifications to the linked list
           // without being interrupted by other writers (other writers attempting
           // to get an upgrade lock will block until the writer that
           // has it releases it.)
           UpgradeLock  upgrade_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); )
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*   f = (floatData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 0.123f);
                 assigned_list->push_front(*i); 
                 assigned_list->erase(i++);
                 instWriteFloatCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_f_count < 2) )
                 {
                    if ( add_f_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_f_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new floatData(-(float)(inst*10000+add_f_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<floatData>(-(float)(inst*10000+add_f_count)));
                    }
                    add_f_count++;
                 }
              }
              else if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*     f = (intData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 123);
                 assigned_list->push_front(*i);
                 assigned_list->erase(i++);
                 instWriteIntCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_i_count < 3) )
                 {
                    if ( add_i_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_i_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new intData(-(int)(inst*10000+add_i_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<intData>(-(int)(inst*10000+add_i_count)));
                    }
                    add_i_count++;
                 }
              }
              else
              {
                 ++i;
              }
           }
        }
     }

     cout << "Worker: finished" << " " << inst << endl;
  }

  //******************************************************************************
  // Main Function
  //******************************************************************************
  int main(int argc, char* argv[])
  {
     {
        myList              test_list;
        myList              test_list_ints;
        myList              test_list_floats;
        myList::iterator    i;

        // Fill the main list with some values
        test_list.insert(test_list.end(), new intData(1));
        test_list.insert(test_list.end(), new intData(2));
        test_list.insert(test_list.end(), new intData(3));
        test_list.insert(test_list.end(), new floatData(333.333f));
        test_list.insert(test_list.end(), new floatData(555.555f));
        test_list.insert(test_list.end(), new floatData(777.777f));

        // Display the main list elements and add the specific values
        // for each specialized list containing specific types of elements.
        // The end result is that each object in the main list will also
        // be in the specialized list.
        cout << "test:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
              test_list_floats.insert(test_list_floats.end(), *i);
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
              test_list_ints.insert(test_list_ints.end(), *i);
           }
        }
        cout << endl;

        // Display the list with float type elements
        cout << "float test:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // Display the list with integer type elements
        cout << "int test:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // NOTE: To reduce mutex bottleneck coupling in a real application it is recommended that
        // each linked list have it's own shareable mutex.
        // I used the same mutex here for all three lists to have the output from each thread
        // appear in a single line. If I use one mutex per thread then it would appear
        // jumbled up and almost unreadable.
        // To use a unique mutex per list enable UNIQUE_MUTEX macro.
        // For this test I did not notice any performance differences, but that will
        // depend largely on how long the unique lock is held.
        boost::thread workerThread0(workerFunc, 0, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread1(workerFunc, 1, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread2(workerFunc, 2, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread3(workerFunc, 3, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread4(workerFunc, 4, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread5(workerFunc, 5, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread6(workerFunc, 6, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread7(workerFunc, 7, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread8(workerFunc, 8, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread9(workerFunc, 9, &test_list_ints,   &INT_LIST_MUTEX);
        workerThread0.join();
        workerThread1.join();
        workerThread2.join();
        workerThread3.join();
        workerThread4.join();
        workerThread5.join();
        workerThread6.join();
        workerThread7.join();
        workerThread8.join();
        workerThread9.join();

        cout << "*** Test End ***:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
           }
        }
        cout << endl;
        cout << "float test end:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "int test end:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "*** thread counts***" << endl;
        for ( int idx = 0; idx < NUM_THREADS; idx++)
        {
           cout << "    thread " << idx;
           cout << ": int rd(" << setw(NUM_WIDTH) << instReadIntCount[idx];
           cout << ") int wr(" << setw(NUM_WIDTH) << instWriteIntCount[idx];
           cout << ") flt rd(" << setw(NUM_WIDTH) << instReadFloatCount[idx];
           cout << ") flt wr(" << setw(NUM_WIDTH) << instWriteFloatCount[idx];
           cout << ")" <<  endl;
        }
     }

     // All records in the linked list have now been deallocated automatically(due to smart pointer)
     // as the linked list objects have been destroyed due to going out of scope.  
     cout << "*** Object Deletion counts***" << endl;
     cout << "  int deletes: " << instIntDeletes << endl;
     cout << "float deletes: " << instFloatDeletes << endl;

     return 0;
  }

【问题讨论】:

    标签: c++ list shared-ptr boost-thread boost-mutex


    【解决方案1】:

    boost::container::list::erase(const_iterator) 的复杂度为amortised constant time(在boost/container/list.hpp 中搜索iterator erase(const_iterator p))。所以调用这个函数时不会重复搜索。

    不过,我想说几点。

    最近一位并发专家建议我,只有在确定明确需要之后才使用 UpgradeLockable 概念是明智的;即在分析之后。与upgrade_mutexes 关联的锁必然比简单的boost::mutex::scoped_locks 或std::lock_guards 更复杂,因此性能较差。

    在您的示例中,您可能会发现当前(更复杂的)设置与将 upgrade_mutex 替换为 mutex 并始终独占锁定之间没有显着的性能差异。

    另一点是,您的代码 cmets 似乎表明您认为给定 upgrade_mutex 上的 boost::upgrade_lock 的多个实例可以共存。不是这种情况。一次只能有一个单个线程持有upgrade_lock

    其他几个线程可能持有shared_locks,而upgrade_lock 被持有,但这些shared_locks 必须在upgrade_lock 升级为唯一之前释放。

    有关详细信息,请参阅UpgradeLockable Concept 上的 boost 文档。


    编辑

    只是为了确认下面 cmets 中的观点,下面的示例显示了新的 shared_locks 可以在 upgrade_lock 存在时获取,但在 upgrade_to_unique_lock 存在时不能获取(使用 boost 1.51 测试):

    #include <iostream>
    #include <vector>
    
    #include <boost/thread.hpp>
    #include <boost/date_time.hpp>
    #include <boost/thread/locks.hpp>
    #include <boost/thread/shared_mutex.hpp>
    
    typedef boost::shared_lock<boost::upgrade_mutex>            SharedLock;
    typedef boost::upgrade_to_unique_lock<boost::upgrade_mutex> UniqueLock;
    typedef boost::upgrade_lock<boost::upgrade_mutex>           UpgradeLock;
    
    boost::upgrade_mutex the_mutex;
    
    void Write() {
      UpgradeLock upgrade_lock(the_mutex);
      std::cout << "\tPreparing to write\n";
      boost::this_thread::sleep(boost::posix_time::seconds(1));
      UniqueLock unique_lock(upgrade_lock);
      std::cout << "\tStarting to write\n";
      boost::this_thread::sleep(boost::posix_time::seconds(5));
      std::cout << "\tDone writing.\n";
    }
    
    void Read() {
      SharedLock lock(the_mutex);
      std::cout << "Starting to read.\n";
      boost::this_thread::sleep(boost::posix_time::seconds(1));
      std::cout << "Done reading.\n";
    }
    
    int main() {
      // Start a read operation
      std::vector<boost::thread> reader_threads;
      reader_threads.push_back(std::move(boost::thread(Read)));
      boost::this_thread::sleep(boost::posix_time::milliseconds(250));
    
      // Start a write operation.  This will block trying to upgrade
      // the UpgradeLock to UniqueLock since a SharedLock currently exists.
      boost::thread writer_thread(Write);
    
      // Start several other read operations.  These won't be blocked
      // since only an UpgradeLock and SharedLocks currently exist.
      for (int i = 0; i < 25; ++i) {
        boost::this_thread::sleep(boost::posix_time::milliseconds(100));
        reader_threads.push_back(std::move(boost::thread(Read)));
      }
    
      // Join the readers.  This allows the writer to upgrade to UniqueLock
      // since it's currently the only lock.
      for (auto& reader_thread : reader_threads)
        reader_thread.join();
    
      // Start a new read operation.  This will be blocked since a UniqueLock
      // currently exists.
      boost::this_thread::sleep(boost::posix_time::milliseconds(100));
      boost::thread reader_thread(Read);
    
      writer_thread.join();
      reader_thread.join();
    
      return 0;
    }
    

    【讨论】:

    • 嗨弗雷泽,我更正了关于锁如何工作的评论。据我从 boost 文档中了解到,多个共享锁和单个升级锁可以共存。问题是在升级锁处于活动状态时新的共享锁会阻塞直到升级锁被释放,这确保了升级锁最终可以转换为唯一锁。尝试获取升级锁的多个编写者将被阻塞,直到当前升级锁被释放,但在任何时候只有一个可以获取它。
    • 另外,关于性能,我知道有与之相关的开销,但在我的应用程序中,大多数时候只有读取器锁和偶尔的写入器锁。主要目标是尽量减少对使用读取器锁遍历列表的线程的性能影响。通过使用共享读锁,多个线程可以在同一个列表上进行迭代,争用由线程优先级处理(这个 WC7 因此调度程序严格基于优先级抢占式)。
    • @RicardoAndujar 当存在 upgrade_lock 时,没有什么可以阻止新的 shared_locks 被获取。更重要的是,升级到 unique_lock 的尝试将被阻塞,直到所有 shared_locks 被释放。当写入者等待升级锁时,更多的读取者可以获得shared_locks。所以你可能会遇到这样的情况,即有这么多读者,总是一个 shared_lock,这意味着作者永远不会有机会获得 unique_lock。从那里你进入 try_lock 或定时变体领域 - 更复杂。
    • 弗雷泽,我唯一需要做的就是弄清楚是否需要用 sw 封装列表代码以自动执行锁定,但此时我只看到需要使mutex 派生列表类的一部分。感谢您的回复。
    • 我对其进行了测试,并且确实似乎新读者在升级后的锁后面被阻塞了。我相信您的描述与升级锁的早期版本相符,但我需要进行更多测试以确认。我将此响应基于调试器信息,该信息同时显示活动读取器锁和等待读取器锁,而仅存在升级锁。
    最近更新 更多