【发布时间】:2015-02-09 02:53:50
【问题描述】:
编辑 @Mike 指出我在下面代码中的 try_lock 函数是不安全的,并且访问器的创建也会产生竞争条件。 (来自所有人的)建议使我确信我走错了路。
原始问题
锁定嵌入式微控制器的要求与多线程有很大不同,因此我无法将多线程示例转换为我的嵌入式应用程序。通常我没有任何类型的操作系统或线程,只有main 以及硬件定期调用的任何中断函数。
我需要从中断填充缓冲区是很常见的,但在main 中处理它。我在下面创建了IrqMutex 类来尝试安全地实现它。每个试图访问缓冲区的人都通过IrqMutexAccessor 分配了一个唯一的ID,然后他们每个人都可以try_lock() 和unlock()。阻塞lock() 函数的想法不适用于中断,因为除非您允许中断完成,否则没有其他代码可以执行,因此unlock() 代码永远不会运行。不过,我偶尔会使用main() 代码中的阻塞锁。
但是,我知道如果没有 C++11 内存屏障(在许多嵌入式平台上不可用),双重检查锁就无法工作。老实说,尽管阅读了很多关于它的内容,但我并不真正理解内存访问重新排序如何/为什么会导致问题。我认为 volatile sig_atomic_t 的使用(可能与唯一 ID 的使用结合使用)使得这与双重检查锁不同。但我希望有人可以:确认以下代码是正确的,解释为什么它不安全,或提供更好的方法来完成此操作。
class IrqMutex {
friend class IrqMutexAccessor;
private:
std::sig_atomic_t accessorIdEnum;
volatile std::sig_atomic_t owner;
protected:
std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }
bool have_lock(std::sig_atomic_t accessorId) {
return (owner == accessorId);
}
bool try_lock(std::sig_atomic_t accessorId) {
// Only try to get a lock, while it isn't already owned.
while (owner == SIG_ATOMIC_MIN) {
// <-- If an interrupt occurs here, both attempts can get a lock at the same time.
// Try to take ownership of this Mutex.
owner = accessorId; // SET
// Double check that we are the owner.
if (owner == accessorId) return true;
// Someone else must have taken ownership between CHECK and SET.
// If they released it after CHECK, we'll loop back and try again.
// Otherwise someone else has a lock and we have failed.
}
// This shouldn't happen unless they called try_lock on something they already owned.
if (owner == accessorId) return true;
// If someone else owns it, we failed.
return false;
}
bool unlock(std::sig_atomic_t accessorId) {
// Double check that the owner called this function (not strictly required)
if (owner == accessorId) {
owner = SIG_ATOMIC_MIN;
return true;
}
// We still return true if the mutex was unlocked anyway.
return (owner == SIG_ATOMIC_MIN);
}
public:
IrqMutex(void) : accessorIdEnum(SIG_ATOMIC_MIN), owner(SIG_ATOMIC_MIN) {}
};
// This class is used to manage our unique accessorId.
class IrqMutexAccessor {
friend class IrqMutex;
private:
IrqMutex& mutex;
const std::sig_atomic_t accessorId;
public:
IrqMutexAccessor(IrqMutex& m) : mutex(m), accessorId(m.nextAccessor()) {}
bool have_lock(void) { return mutex.have_lock(accessorId); }
bool try_lock(void) { return mutex.try_lock(accessorId); }
bool unlock(void) { return mutex.unlock(accessorId); }
};
因为只有一个处理器,并且互斥体没有线程化,所以我认为它的用途与正常情况略有不同。我反复遇到两个主要用例。
- 中断是一个生产者,它拥有一个空闲缓冲区的所有权,并用一个数据包加载它。中断/生产者可能会长时间保持其所有权锁定,跨越多个中断调用。主要功能是消费者,当它准备好处理它时,它会拥有一个完整的缓冲区。竞争条件很少发生,但如果中断/生产者完成一个数据包并需要一个新缓冲区,但它们都已满,它将尝试使用最旧的缓冲区(这是一个丢包事件)。如果主/消费者在完全相同的时间开始读取和处理最旧的缓冲区,它们将互相践踏。
- 中断只是对某物(如计数器)的快速更改或增量。但是,如果我们想通过调用 main() 代码来重置计数器或跳转到某个新值,我们不想在计数器发生变化时尝试写入计数器。这里 main 实际上执行了一个阻塞循环来获取锁,但是我认为几乎不可能必须在这里实际等待两次以上的尝试。一旦它有一个锁,任何对计数器中断的调用都会被跳过,但这对于像计数器这样的东西来说通常不是什么大问题。然后我更新计数器值并解锁它,以便它可以再次开始递增。
我意识到这两个示例有点笨拙,但是这些模式的某些版本出现在我从事的每个项目的许多外围设备中,我希望有一段可重用的代码可以安全地处理各种嵌入式平台。我包括了 C 标记,因为所有这些都可以直接转换为 C 代码,并且在某些嵌入式编译器上,这就是所有可用的。所以我试图找到一种保证在 C 和 C++ 中都能工作的通用方法。
struct ExampleCounter {
volatile long long int value;
IrqMutex mutex;
} exampleCounter;
struct ExampleBuffer {
volatile char data[256];
volatile size_t index;
IrqMutex mutex; // One mutex per buffer.
} exampleBuffers[2];
const volatile char * const REGISTER;
// This accessor shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutex(exampleCounter.mutex);
void __irqQuickFunction(void) {
// Obtain a lock, add the data then unlock all within one function call.
if (myMutex.try_lock()) {
exampleCounter.value++;
myMutex.unlock();
} else {
// If we failed to obtain a lock, we skipped this update this one time.
}
}
// These accessors shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutexes[2] = {
IrqMutexAccessor(exampleBuffers[0].mutex),
IrqMutexAccessor(exampleBuffers[1].mutex)
};
void __irqLongFunction(void) {
static size_t bufferIndex = 0;
// Check if we have a lock.
if (!myMutex[bufferIndex].have_lock() and !myMutex[bufferIndex].try_lock()) {
// If we can't get a lock try the other buffer
bufferIndex = (bufferIndex + 1) % 2;
// One buffer should always be available so the next line should always be successful.
if (!myMutex[bufferIndex].try_lock()) return;
}
// ... at this point we know we have a lock ...
// Get data from the hardware and modify the buffer here.
const char c = *REGISTER;
exampleBuffers[bufferIndex].data[exampleBuffers[bufferIndex].index++] = c;
// We may keep the lock for multiple function calls until the end of packet.
static const char END_PACKET_SIGNAL = '\0';
if (c == END_PACKET_SIGNAL) {
// Unlock this buffer so it can be read from main.
myMutex[bufferIndex].unlock();
// Switch to the other buffer for next time.
bufferIndex = (bufferIndex + 1) % 2;
}
}
int main(void) {
while (true) {
// Mutex for counter
static IrqMutexAccessor myCounterMutex(exampleCounter.mutex);
// Change counter value
if (EVERY_ONCE_IN_A_WHILE) {
// Skip any updates that occur while we are updating the counter.
while(!myCounterMutex.try_lock()) {
// Wait for the interrupt to release its lock.
}
// Set the counter to a new value.
exampleCounter.value = 500;
// Updates will start again as soon as we unlock it.
myCounterMutex.unlock();
}
// Mutexes for __irqLongFunction.
static IrqMutexAccessor myBufferMutexes[2] = {
IrqMutexAccessor(exampleBuffers[0].mutex),
IrqMutexAccessor(exampleBuffers[1].mutex)
};
// Process buffers from __irqLongFunction.
for (size_t i = 0; i < 2; i++) {
// Obtain a lock so we can read the data.
if (!myBufferMutexes[i].try_lock()) continue;
// Check that the buffer isn't empty.
if (exampleBuffers[i].index == 0) {
myBufferMutexes[i].unlock(); // Don't forget to unlock.
continue;
}
// ... read and do something with the data here ...
exampleBuffer.index = 0;
myBufferMutexes[i].unlock();
}
}
}
}
还请注意,我在任何由中断例程读取或写入的变量上使用了volatile(除非该变量仅从中断中访问,如static bufferIndex 中的值__irqLongFunction)。我读过互斥锁消除了多线程代码中对volatile 的一些需求,但我认为这不适用于这里。 我使用了正确数量的volatile吗? 我用过:ExampleBuffer[].data[256]、ExampleBuffer[].index 和 ExampleCounter.value。
【问题讨论】:
-
你似乎把一件很简单的事情变得很复杂。不幸的是,这个问题太复杂了,我不确定我是否有时间解开它来提供建议。
-
@Clifford 我一直发现,考虑中断的所有可能性并尝试快速足够做事,这既复杂又容易出错。我希望将所有复杂性放入这个互斥锁类中,以便简化共享数据的使用。与其看我乱七八糟的源代码,不如看看你将如何处理标记为#1“中断是生产者......”的场景?
-
很公平 - 我正在考虑。