【问题标题】:Mutex Safety with Interrupts (Embedded Firmware)带中断的互斥安全(嵌入式固件)
【发布时间】:2015-02-09 02:53:50
【问题描述】:

编辑 @Mike 指出我在下面代码中的 try_lock 函数是不安全的,并且访问器的创建也会产生竞争条件。 (来自所有人的)建议使我确信我走错了路。

原始问题

锁定嵌入式微控制器的要求与多线程有很大不同,因此我无法将多线程示例转换为我的嵌入式应用程序。通常我没有任何类型的操作系统或线程,只有main 以及硬件定期调用的任何中断函数。

我需要从中断填充缓冲区是很常见的,但在main 中处理它。我在下面创建了IrqMutex 类来尝试安全地实现它。每个试图访问缓冲区的人都通过IrqMutexAccessor 分配了一个唯一的ID,然后他们每个人都可以try_lock()unlock()。阻塞lock() 函数的想法不适用于中断,因为除非您允许中断完成,否则没有其他代码可以执行,因此unlock() 代码永远不会运行。不过,我偶尔会使用main() 代码中的阻塞锁。

但是,我知道如果没有 C++11 内存屏障(在许多嵌入式平台上不可用),双重检查锁就无法工作。老实说,尽管阅读了很多关于它的内容,但我并不真正理解内存访问重新排序如何/为什么会导致问题。我认为 volatile sig_atomic_t 的使用(可能与唯一 ID 的使用结合使用)使得这与双重检查锁不同。但我希望有人可以:确认以下代码是正确的解释为什么它不安全,或提供更好的方法来完成此操作。

class IrqMutex {
friend class IrqMutexAccessor;

private:
    std::sig_atomic_t accessorIdEnum;
    volatile std::sig_atomic_t owner;
protected:
    std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }
    bool have_lock(std::sig_atomic_t accessorId) {
        return (owner == accessorId);
    }
    bool try_lock(std::sig_atomic_t accessorId) {
        // Only try to get a lock, while it isn't already owned.
        while (owner == SIG_ATOMIC_MIN) {
            // <-- If an interrupt occurs here, both attempts can get a lock at the same time.

            // Try to take ownership of this Mutex.
            owner = accessorId; // SET

            // Double check that we are the owner.
            if (owner == accessorId) return true;

            // Someone else must have taken ownership between CHECK and SET.
            // If they released it after CHECK, we'll loop back and try again.
            // Otherwise someone else has a lock and we have failed.
        }        

        // This shouldn't happen unless they called try_lock on something they already owned.
        if (owner == accessorId) return true;

        // If someone else owns it, we failed.
        return false;
    }
    bool unlock(std::sig_atomic_t accessorId) {
        // Double check that the owner called this function (not strictly required)
        if (owner == accessorId) {
            owner = SIG_ATOMIC_MIN;
            return true;
        }
        
        // We still return true if the mutex was unlocked anyway.
        return (owner == SIG_ATOMIC_MIN);
    }
public:
    IrqMutex(void) : accessorIdEnum(SIG_ATOMIC_MIN), owner(SIG_ATOMIC_MIN) {}
};

// This class is used to manage our unique accessorId.
class IrqMutexAccessor {
friend class IrqMutex;
private:
    IrqMutex& mutex;
    const std::sig_atomic_t accessorId;
public:
    IrqMutexAccessor(IrqMutex& m) : mutex(m), accessorId(m.nextAccessor()) {}
    bool have_lock(void) { return mutex.have_lock(accessorId); }
    bool try_lock(void) { return mutex.try_lock(accessorId); }
    bool unlock(void) { return mutex.unlock(accessorId); }
};

因为只有一个处理器,并且互斥体没有线程化,所以我认为它的用途与正常情况略有不同。我反复遇到两个主要用例。

  1. 中断是一个生产者,它拥有一个空闲缓冲区的所有权,并用一个数据包加载它。中断/生产者可能会长时间保持其所有权锁定,跨越多个中断调用。主要功能是消费者,当它准备好处理它时,它会拥有一个完整的缓冲区。竞争条件很少发生,但如果中断/生产者完成一个数据包并需要一个新缓冲区,但它们都已满,它将尝试使用最旧的缓冲区(这是一个丢包事件)。如果主/消费者在完全相同的时间开始读取和处理最旧的缓冲区,它们将互相践踏。
  2. 中断只是对某物(如计数器)的快速更改或增量。但是,如果我们想通过调用 main() 代码来重置计数器或跳转到某个新值,我们不想在计数器发生变化时尝试写入计数器。这里 main 实际上执行了一个阻塞循环来获取锁,但是我认为几乎不可能必须在这里实际等待两次以上的尝试。一旦它有一个锁,任何对计数器中断的调用都会被跳过,但这对于像计数器这样的东西来说通常不是什么大问题。然后我更新计数器值并解锁它,以便它可以再次开始递增。

我意识到这两个示例有点笨拙,但是这些模式的某些版本出现在我从事的每个项目的许多外围设备中,我希望有一段可重用的代码可以安全地处理各种嵌入式平台。我包括了 C 标记,因为所有这些都可以直接转换为 C 代码,并且在某些嵌入式编译器上,这就是所有可用的。所以我试图找到一种保证在 C 和 C++ 中都能工作的通用方法。

struct ExampleCounter {
    volatile long long int value;
    IrqMutex mutex;
} exampleCounter;

struct ExampleBuffer {
    volatile char data[256];
    volatile size_t index;
    IrqMutex mutex; // One mutex per buffer.
} exampleBuffers[2];

const volatile char * const REGISTER;

// This accessor shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutex(exampleCounter.mutex);
void __irqQuickFunction(void) {
    // Obtain a lock, add the data then unlock all within one function call.
    if (myMutex.try_lock()) {
        exampleCounter.value++;
        myMutex.unlock();
    } else {
        // If we failed to obtain a lock, we skipped this update this one time.
    }
}

// These accessors shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutexes[2] = {
    IrqMutexAccessor(exampleBuffers[0].mutex),
    IrqMutexAccessor(exampleBuffers[1].mutex)
};
void __irqLongFunction(void) {
    static size_t bufferIndex = 0;

    // Check if we have a lock.
    if (!myMutex[bufferIndex].have_lock() and !myMutex[bufferIndex].try_lock()) {
        // If we can't get a lock try the other buffer
        bufferIndex = (bufferIndex + 1) % 2;

        // One buffer should always be available so the next line should always be successful.
        if (!myMutex[bufferIndex].try_lock()) return;
    }
    
    // ... at this point we know we have a lock ...

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;
    exampleBuffers[bufferIndex].data[exampleBuffers[bufferIndex].index++] = c;

    // We may keep the lock for multiple function calls until the end of packet.
    static const char END_PACKET_SIGNAL = '\0';    
    if (c == END_PACKET_SIGNAL) {
        // Unlock this buffer so it can be read from main.
        myMutex[bufferIndex].unlock();

        // Switch to the other buffer for next time.
        bufferIndex = (bufferIndex + 1) % 2;
    }
}

int main(void) {
    while (true) {
        // Mutex for counter
        static IrqMutexAccessor myCounterMutex(exampleCounter.mutex);

        // Change counter value
        if (EVERY_ONCE_IN_A_WHILE) {
            // Skip any updates that occur while we are updating the counter.
            while(!myCounterMutex.try_lock()) {
                // Wait for the interrupt to release its lock.
            }

            // Set the counter to a new value.
            exampleCounter.value = 500;

            // Updates will start again as soon as we unlock it.
            myCounterMutex.unlock();
        }

        // Mutexes for __irqLongFunction.
        static IrqMutexAccessor myBufferMutexes[2] = {
            IrqMutexAccessor(exampleBuffers[0].mutex),
            IrqMutexAccessor(exampleBuffers[1].mutex)
        };

        // Process buffers from __irqLongFunction.
        for (size_t i = 0; i < 2; i++)  {
            // Obtain a lock so we can read the data.
            if (!myBufferMutexes[i].try_lock()) continue;
                // Check that the buffer isn't empty.
                if (exampleBuffers[i].index == 0) {
                    myBufferMutexes[i].unlock(); // Don't forget to unlock.
                    continue;
                }

                // ... read and do something with the data here ...
                exampleBuffer.index = 0;

                myBufferMutexes[i].unlock();
            }
        }
    }
}

还请注意,我在任何由中断例程读取或写入的变量上使用了volatile(除非该变量从中断中访问,如static bufferIndex 中的值__irqLongFunction)。我读过互斥锁消除了多线程代码中对volatile 的一些需求,但我认为这不适用于这里。 我使用了正确数量的volatile吗? 我用过:ExampleBuffer[].data[256]ExampleBuffer[].indexExampleCounter.value

【问题讨论】:

  • 你似乎把一件很简单的事情变得很复杂。不幸的是,这个问题太复杂了,我不确定我是否有时间解开它来提供建议。
  • @Clifford 我一直发现,考虑中断的所有可能性并尝试快速足够做事,这既复杂又容易出错。我希望将所有复杂性放入这个互斥锁类中,以便简化共享数据的使用。与其看我乱七八糟的源代码,不如看看你将如何处理标记为#1“中断是生产者......”的场景?
  • 很公平 - 我正在考虑。

标签: c++ c embedded


【解决方案1】:

对于冗长的答案,我深表歉意,但也许它适合一个长的问题。

要回答您的第一个问题,我想说您的 IrqMutex 实现并不安全。让我试着解释一下我发现问题的地方。

函数nextAccessor

std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }

这个函数有一个竞争条件,因为增量运算符不是原子的,尽管它是在一个标记为volatile 的原子值上。它涉及 3 个操作:读取accessorIdEnum 的当前值,将其递增,然后将结果写回。如果同时创建了两个IrqMutexAccessors,有可能它们都得到了相同的ID。

函数try_lock

try_lock 函数也有竞争条件。一个线程(例如主线程)可以进入while 循环,然后在获得所有权之前,另一个线程(例如中断)也可以进入while 循环并获得锁的所有权(返回true) .然后第一个线程可以继续,移动到owner = accessorId,从而“也”获得锁。因此,两个线程(或您的 main 线程和一个中断)可以同时在一个无主互斥体上 try_lock 并且都返回 true

通过 RAII 禁用中断

我们可以通过使用 RAII 进行中断禁用来实现某种程度的简单性和封装性,例如以下类:

class InterruptLock {
public:
    InterruptLock() { 
        prevInterruptState = currentInterruptState();
        disableInterrupts();
    }

    ~InterruptLock() { 
        restoreInterrupts(prevInterruptState);
    }
private:
    int prevInterruptState; // Whatever type this should be for the platform
    InterruptLock(const InterruptLock&); // Not copy-constructable
};

我建议禁用中断以在互斥锁实现本身中获得所需的原子性。例如:

bool try_lock(std::sig_atomic_t accessorId) {
    InterruptLock lock;
    if (owner == SIG_ATOMIC_MIN) {
        owner = accessorId;
        return true;
    }
    return false;
}
bool unlock(std::sig_atomic_t accessorId) {
    InterruptLock lock;
    if (owner == accessorId) {
        owner = SIG_ATOMIC_MIN;
        return true;
    }
    return false;
}

根据您的平台,这可能看起来不同,但您明白了。

正如您所说,这提供了一个平台,可以从通用代码中的禁用和启用中断中抽象出来,并将其封装到这一类中。

互斥体和中断

已经说过我将如何考虑实现互斥锁类,我实际上不会为您的用例使用互斥锁类。正如您所指出的,互斥锁不能很好地处理中断,因为中断不能“阻止”尝试获取互斥锁。出于这个原因,对于直接与中断交换数据的代码,我会强烈考虑直接禁用中断(在主“线程”接触数据时非常很短的时间)。

所以你的计数器可能看起来像这样:

volatile long long int exampleCounter;

void __irqQuickFunction(void) {
    exampleCounter++;
}
...
// Change counter value
if (EVERY_ONCE_IN_A_WHILE) {
    InterruptLock lock;
    exampleCounter = 500;
}

在我看来,这更容易阅读,更容易推理,并且在发生争用时不会“滑倒”(即错过计时器节拍)。

关于缓冲区用例,我强烈建议不要为多个中断周期持有锁。锁/互斥锁应该只保留“触摸”一块内存所需的最微小的时间——只要足够长的时间来读取或写入它。进来,出去。

这就是缓冲示例的外观:

struct ExampleBuffer {
    char data[256];
} exampleBuffers[2];

ExampleBuffer* volatile bufferAwaitingConsumption = nullptr;
ExampleBuffer* volatile freeBuffer = &exampleBuffers[1];

const volatile char * const REGISTER;

void __irqLongFunction(void) {

    static const char END_PACKET_SIGNAL = '\0';    
    static size_t index = 0;
    static ExampleBuffer* receiveBuffer = &exampleBuffers[0];

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;
    receiveBuffer->data[index++] = c;

    // End of packet?
    if (c == END_PACKET_SIGNAL) {
        // Make the packet available to the consumer
        bufferAwaitingConsumption = receiveBuffer;
        // Move on to the next buffer
        receiveBuffer = freeBuffer;
        freeBuffer = nullptr;
        index = 0;
    }
}


int main(void) {

    while (true) {

        // Fetch packet from shared variable
        ExampleBuffer* packet;
        {
            InterruptLock lock;
            packet = bufferAwaitingConsumption;
            bufferAwaitingConsumption = nullptr;
        }

        if (packet) {
            // ... read and do something with the data here ...

            // Once we're done with the buffer, we need to release it back to the producer
            {
                InterruptLock lock;
                freeBuffer = packet;
            }
        }
    }
}

这段代码可以说更容易推理,因为中断和主循环之间只有两个内存位置共享:一个将数据包从中断传递到主循环,一个将空缓冲区传递回中断.我们也只触及那些处于“锁定”状态的变量,并且只触及“移动”值所需的最短时间。 (为简单起见,当主循环释放缓冲区的时间过长时,我跳过了缓冲区溢出逻辑)。

确实,在这种情况下,甚至可能不需要锁,因为我们只是读取和写入简单的值,但是禁用中断的成本并不高,否则出错的风险是不值得的在我看来。

编辑

正如 cmets 中所指出的,上述解决方案仅用于解决多线程问题,并省略了溢出检查。这是更完整的解决方案,在溢出条件下应该是稳健的:

const size_t BUFFER_COUNT = 2; 

struct ExampleBuffer {
    char data[256];
    ExampleBuffer* next;
} exampleBuffers[BUFFER_COUNT];

volatile size_t overflowCount = 0;

class BufferList {
public:
    BufferList() : first(nullptr), last(nullptr) { }

    // Atomic enqueue
    void enqueue(ExampleBuffer* buffer) {
        InterruptLock lock;
        if (last)
            last->next = buffer;
        else {
            first = buffer;
            last = buffer;
        }
    }

    // Atomic dequeue (or returns null)
    ExampleBuffer* dequeueOrNull() {
        InterruptLock lock;
        ExampleBuffer* result = first;
        if (first) {
            first = first->next;
            if (!first)
                last = nullptr;
        }
        return result;
    }
private:
    ExampleBuffer* first;
    ExampleBuffer* last;
} freeBuffers, buffersAwaitingConsumption;

const volatile char * const REGISTER;

void __irqLongFunction(void) {

    static const char END_PACKET_SIGNAL = '\0';    
    static size_t index = 0;
    static ExampleBuffer* receiveBuffer = &exampleBuffers[0];

    // Recovery from overflow?
    if (!receiveBuffer) {
        // Try get another free buffer
        receiveBuffer = freeBuffers.dequeueOrNull();
        // Still no buffer?
        if (!receiveBuffer) {
            overflowCount++;
            return; 
        }
    }

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;

    if (index < sizeof(receiveBuffer->data))
        receiveBuffer->data[index++] = c;

    // End of packet, or out of space?
    if (c == END_PACKET_SIGNAL) {
        // Make the packet available to the consumer
        buffersAwaitingConsumption.enqueue(receiveBuffer);
        // Move on to the next free buffer
        receiveBuffer = freeBuffers.dequeueOrNull();
        index = 0;
    }
}

size_t getAndResetOverflowCount() {
    InterruptLock lock;
    size_t result = overflowCount;
    overflowCount = 0;
    return result;
}


int main(void) {

    // All buffers are free at the start
    for (int i = 0; i < BUFFER_COUNT; i++)
        freeBuffers.enqueue(&exampleBuffers[i]);

    while (true) {

        // Fetch packet from shared variable
        ExampleBuffer* packet = dequeueOrNull();

        if (packet) {
            // ... read and do something with the data here ...

            // Once we're done with the buffer, we need to release it back to the producer
            freeBuffers.enqueue(packet);
        }

        size_t overflowBytes = getAndResetOverflowCount();
        if (overflowBytes) {
            // ...
        }
    }
}

关键变化:

  • 如果中断用完空闲缓冲区,它将恢复
  • 如果中断在没有接收缓冲区的情况下接收数据,它将通过getAndResetOverflowCount 与主线程通信
  • 如果您不断收到缓冲区溢出,您可以简单地增加缓冲区计数
  • 我已将多线程访问封装到一个队列类中,该队列类实现为链表 (BufferList),它支持原子出队和入队。前面的示例也使用了队列,但长度为 0-1(一个项目入队或未入队),因此队列的实现只是一个变量。在空闲缓冲区用完的情况下,接收队列可能有 2 个项目,因此我将其升级为适当的队列,而不是添加更多共享变量。

【讨论】:

  • 在您的缓冲区示例中,如果 irq 在主代码完成第一个缓冲区之前完成了第二个缓冲区会发生什么?我认为 receiveBuffer 将被设置为 nullptr 并且下一个传入的数据包将被复制到无效的内存中。
  • @kkrambo 是的,完全正确。我的答案中确实有一条关于此答案没有溢出检查的注释-我一定是不小心删除了它。在这种情况下,溢出不是线程问题——多线程原则仍然适用。但为了完整起见,我将编辑我的答案以包括溢出处理。
【解决方案2】:

如果中断是生产者,而主线代码是消费者,那肯定就像在消费操作期间禁用中断一样简单吗?

这就是我过去在嵌入式微控制器时代的做法。

【讨论】:

  • 这确实是常规且通常足够的方法。它发生故障的地方是嵌套中断是可能的并且两个中断访问相同的非原子数据或资源;但到那时你真的应该考虑一个支持线程和互斥体的 RTOS 内核。
  • 对于计数器代码,禁用中断就足够了。然而,消费者通常不够快,无法考虑禁用中断。 (即使在没有嵌套中断且只有一个中断源的非常基本的设置中。)例如,我们每 40 毫秒接收一个字节,如果我们错过了一个字节,则整个数据包都已损坏。因此,您必须在 40 毫秒的窗口内处理整个数据包。
  • 在消费操作过程中,您不必处理数据包,只需将其排入队列即可。如果将数据包保存在单链表中(例如),则出队的成本是 2 次内存移动。
  • @AndyStangeland 如果您的数据速率是 40 毫秒内的 1 个字节,那么您几乎不需要担心。在通用桌面操作系统中可能会遇到这种延迟,但在裸机系统上,延迟可能接近于零,例如在 72MHz Cortex-M3 上,这段时间内可以执行大约 360 万条指令。即使你的意思是 40 微秒,也就是 3600 条指令。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-05
  • 2013-05-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多