【发布时间】:2012-01-05 06:25:32
【问题描述】:
我正在尝试在 pthread、semaphore.h 和 gcc atomic builtins 之上实现一个由循环缓冲区支持的高性能阻塞队列。队列需要同时处理来自不同线程的多个读取器和写入器。
我已经隔离了某种竞争条件,我不确定这是否是对某些原子操作和信号量行为的错误假设,或者我的设计是否存在根本缺陷。
我已将其提取并简化为以下独立示例。我希望这个程序永远不会返回。然而,它确实会在几十万次迭代后返回,并在队列中检测到损坏。
在下面的示例中(用于说明),它实际上并没有存储任何内容,它只是将一个保存实际数据的单元格设置为 1,将一个空单元格设置为 0。有一个计数信号量(vacancys)表示空闲单元格的数量,另一个计数信号量(occupants)表示占用单元格的数量。
作家做以下事情:
- 减少职位空缺
- 以原子方式获取下一个头部索引(mod 队列大小)
- 给它写信
- 增加居住人数
读者反其道而行之:
- 减少占用人数
- 自动获取下一个尾索引(mod 队列大小)
- 从中读取
- 增加职位空缺
我希望鉴于上述情况,恰好一个线程可以同时读取或写入任何给定的单元格。
任何关于它为什么不起作用或调试策略的想法都值得赞赏。下面的代码和输出...
#include <stdlib.h>
#include <semaphore.h>
#include <iostream>
using namespace std;
#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2
struct CountingSemaphore
{
sem_t m;
CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
void post() { sem_post(&m); }
void wait() { sem_wait(&m); }
~CountingSemaphore() { sem_destroy(&m); }
};
struct BlockingQueue
{
unsigned int head; // (head % capacity) is next head position
unsigned int tail; // (tail % capacity) is next tail position
CountingSemaphore vacancies; // how many cells are vacant
CountingSemaphore occupants; // how many cells are occupied
int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant
BlockingQueue() :
head(0),
tail(0),
vacancies(QUEUE_CAPACITY),
occupants(0)
{
for (size_t i = 0; i < QUEUE_CAPACITY; i++)
cell[i] = 0;
}
// put an item in the queue
void put()
{
vacancies.wait();
// atomic post increment
set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);
occupants.post();
}
// take an item from the queue
void take()
{
occupants.wait();
// atomic post increment
get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);
vacancies.post();
}
// set cell i
void set(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
{
corrupt("set", i);
exit(-1);
}
}
// get cell i
void get(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
{
corrupt("get", i);
exit(-1);
}
}
// corruption detected
void corrupt(const char* action, unsigned int i)
{
static CountingSemaphore sem(1);
sem.wait();
cerr << "corruption detected" << endl;
cerr << "action = " << action << endl;
cerr << "i = " << i << endl;
cerr << "head = " << head << endl;
cerr << "tail = " << tail << endl;
for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
cerr << "cell[" << j << "] = " << cell[j] << endl;
}
};
BlockingQueue q;
// keep posting to the queue forever
void* Source(void*)
{
while (true)
q.put();
return 0;
}
// keep taking from the queue forever
void* Sink(void*)
{
while (true)
q.take();
return 0;
}
int main()
{
pthread_t id;
// start some pthreads to run Source function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Source, 0))
abort();
// start some pthreads to run Sink function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Sink, 0))
abort();
while (true);
}
将上面的编译如下:
$ g++ -pthread AboveCode.cpp
$ ./a.out
每次输出都不一样,这里举一个例子:
corruption detected
action = get
i = 6
head = 122685
tail = 122685
cell[0] = 0
cell[1] = 0
cell[2] = 1
cell[3] = 0
cell[4] = 1
cell[5] = 0
cell[6] = 1
cell[7] = 1
我的系统是 Intel Core 2 上的 Ubuntu 11.10:
$ uname -a
Linux 3.0.0-14-generic #23-Ubuntu SMP \
Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
$ cat /proc/cpuinfo | grep Intel
model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
$ g++ --version
g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1
谢谢, 安德鲁。
【问题讨论】:
-
不幸的是,很难理解这里出了什么问题......您不使用已经存在和调试过的版本有什么原因吗?例如,在性能方面,您的版本在多核系统上可能存在错误共享问题。
-
虚假共享问题是指原子操作中的隐式内存屏障?强制将 Lx 缓存写回主内存等。我不确定如何避免这种情况?两个或多个读取器/写入器可能在不同 CPU 上的不同线程上运行,那么您还建议如何同步它们?
-
并非如此。错误共享是关于缓存争用问题,当两个不同的内核访问不同的变量时......恰好落在同一个缓存行中。由于独占所有权(写入所需)是在缓存行的基础上协商的,因此两个内核必须序列化它们的操作,即使它们正在访问和修改两个语义不同的变量。它可能真的会损害性能......并且现有的优化实现已经解决了这个问题:)
-
现有的哪些实现解决了这个问题,如何解决?您能否提供一个文件/行参考作为示例。
标签: c++ linux multithreading semaphore blockingqueue