【问题标题】:Spinning thread barrier using Atomic Builtins使用 Atomic Builtins 旋转线程屏障
【发布时间】:2016-02-09 11:35:06
【问题描述】:

我正在尝试使用原子实现旋转线程屏障,特别是 __sync_fetch_and_add。 https://gcc.gnu.org/onlinedocs/gcc-4.4.5/gcc/Atomic-Builtins.html

我基本上想要一个替代 pthread 屏障的方法。我在一个可以并行运行大约一百个线程的系统上使用 Ubuntu。

int bar = 0;                      //global variable
 int P = MAX_THREADS;              //number of threads

 __sync_fetch_and_add(&bar,1);     //each thread comes and adds atomically
 while(bar<P){}                    //threads spin until bar increments to P
 bar=0;                            //a thread sets bar=0 to be used in the next spinning barrier

由于显而易见的原因,这不起作用(一个线程可能设置 bar=0,而另一个线程陷入无限的 while 循环等)。我在这里看到了一个实现:Writing a (spinning) thread barrier using c++11 atomics,但是它看起来太复杂了,我认为它的性能可能比 pthread barrier 差。

由于 bar 的缓存行在线程之间进行乒乓,因此预计此实现还会在内存层次结构中产生更多流量。

关于如何使用这些原子指令来制作简单的屏障有什么想法吗?此外,通信优化方案也会有所帮助。

【问题讨论】:

    标签: c++ multithreading atomic compare-and-swap barrier


    【解决方案1】:

    与其在线程的计数器上旋转,不如在通过的barries数上旋转,它只会由最后一个线程递增,面对障碍。这种方式还可以减少内存缓存压力,因为旋转变量现在仅由单线程更新。

    int P = MAX_THREADS;
    int bar = 0; // Counter of threads, faced barrier.
    volatile int passed = 0; // Number of barriers, passed by all threads.
    
    void barrier_wait()
    {
        int passed_old = passed; // Should be evaluated before incrementing *bar*!
    
        if(__sync_fetch_and_add(&bar,1) == (P - 1))
        {
            // The last thread, faced barrier.
            bar = 0;
            // *bar* should be reseted strictly before updating of barriers counter.
            __sync_synchronize(); 
            passed++; // Mark barrier as passed.
        }
        else
        {
            // Not the last thread. Wait others.
            while(passed == passed_old) {};
            // Need to synchronize cache with other threads, passed barrier.
            __sync_synchronize();
        }
    }
    

    注意,您需要使用volatile 修饰符来旋转变量。

    C++ 代码可能比 C 代码快一些,因为它可以使用 acquire/release 内存屏障而不是 full 内存屏障,后者是__sync 函数中唯一可用的障碍:

    int P = MAX_THREADS;
    std::atomic<int> bar = 0; // Counter of threads, faced barrier.
    std::atomic<int> passed = 0; // Number of barriers, passed by all threads.
    
    void barrier_wait()
    {
        int passed_old = passed.load(std::memory_order_relaxed);
    
        if(bar.fetch_add(1) == (P - 1))
        {
            // The last thread, faced barrier.
            bar = 0;
            // Synchronize and store in one operation.
            passed.store(passed_old + 1, std::memory_order_release);
        }
        else
        {
            // Not the last thread. Wait others.
            while(passed.load(std::memory_order_relaxed) == passed_old) {};
            // Need to synchronize cache with other threads, passed barrier.
            std::atomic_thread_fence(std::memory_order_acquire);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-08-04
      • 2013-01-18
      • 2010-11-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-09-19
      • 2014-05-24
      相关资源
      最近更新 更多