【发布时间】:2011-10-26 23:02:35
【问题描述】:
我正在查看无锁队列的示例代码:
http://drdobbs.com/high-performance-computing/210604448?pgno=2
(在许多 SO 问题中也有参考,例如Is there a production ready lock-free queue or hash implementation in C++)
这看起来应该适用于单个生产者/消费者,尽管代码中有许多拼写错误。我已将代码更新为如下所示,但它让我崩溃了。有人有什么建议吗?
特别是,应该将divider 和last 声明为:
atomic<Node *> divider, last; // shared
我在这台机器上没有支持 C++0x 的编译器,所以也许这就是我所需要的...
// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.
template <typename T>
class LockFreeQueue {
private:
struct Node {
Node( T val ) : value(val), next(0) { }
T value;
Node* next;
};
Node *first, // for producer only
*divider, *last; // shared
public:
LockFreeQueue()
{
first = divider = last = new Node(T()); // add dummy separator
}
~LockFreeQueue()
{
while( first != 0 ) // release the list
{
Node* tmp = first;
first = tmp->next;
delete tmp;
}
}
void Produce( const T& t )
{
last->next = new Node(t); // add the new item
last = last->next; // publish it
while (first != divider) // trim unused nodes
{
Node* tmp = first;
first = first->next;
delete tmp;
}
}
bool Consume( T& result )
{
if (divider != last) // if queue is nonempty
{
result = divider->next->value; // C: copy it back
divider = divider->next; // D: publish that we took it
return true; // and report success
}
return false; // else report empty
}
};
我编写了以下代码来测试这一点。 Main(未显示)只是调用 TestQ()。
#include "LockFreeQueue.h"
const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);
void *Solver(void *whichID)
{
int id = (long)whichID;
printf("Thread %d initialized\n", id);
int result = 0;
do {
if (q[id].Consume(result))
{
int y = 0;
for (int x = 0; x < result; x++)
{ y++; }
y = 0;
}
} while (result != -1);
return 0;
}
void TestQ()
{
std::vector<pthread_t> threads;
for (int x = 0; x < numThreads; x++)
{
pthread_t thread;
pthread_create(&thread, NULL, Solver, (void *)x);
threads.push_back(thread);
}
for (int y = 0; y < 1000000; y++)
{
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(y);
}
}
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(-1);
}
for (unsigned int x = 0; x < threads.size(); x++)
pthread_join(threads[x], 0);
}
更新:最终崩溃是由队列声明引起的:
std::vector<LockFreeQueue<int> > q(numThreads);
当我将它更改为一个简单的数组时,它运行良好。 (我实现了一个带锁的版本,它也崩溃了。)我看到析构函数在构造函数之后立即被调用,导致内存双重释放。但是,有谁知道为什么会立即使用 std::vector 调用析构函数?
【问题讨论】:
-
@OliCharlesworth "malloc: *** error for object 0x100100a20: pointer being free was not assigned" on line "delete tmp;"
-
另一种实现 single reader_single_ writer 队列的简单方法是使用 2 个队列和一个原子“交换缓冲区”:队列 @987654327@ 用于读取,队列 @ 987654328@ 用于写入,“交换缓冲区”
S最初设置为 null。用法:生产者写完后:交换W <-> S。消费者像往常一样从R读取,如果R用尽则交换R <-> S;交换后尝试再读一次。最后,如果读取器(或写入器)遇到一个 nullptr before 读取(或写入)然后交换R <-> S(或W <-> S)... -
... 优点是简单;缺点是如果在读取之间没有写入任何内容,消费者可能会反复交换空队列。在我的实现中,我实现了一个
listen()方法,该方法会阻止阅读器,直到作者通知它为止。