这是一个带有队列计数器的并发门的 API,以及“休眠”使用它的想法。
struct SleepyDoorQueue {
void UseDoor() {
auto l = lock();
++queue_size;
cv.notify_all();
cv.wait( l, [&]{ return open; } );
--queue_size;
}
// sleeps for a while, then tries to open the door.
// considered in queue while sleeping.
template<class Rep, class Period>
void SleepyUseDoor( const std::chrono::duration<Rep, Period>& rel_time ) {
{
auto l = lock();
++queue_size;
cv.notify_all();
}
std::this_thread::sleep_for(rel_time);
auto l = lock();
cv.wait( l, [&]{ return open; } );
--queue_size;
}
void CloseDoor() {
auto l = lock();
open = false;
}
void OpenDoor() {
auto l = lock();
open = true;
cv.notify_all();
}
void WaitForQueueSize(std::size_t n) const {
auto l = lock();
cv.wait(l, [&]{ return queue_size >= n; } );
}
explicit SleepyDoorQueue( bool startOpened = true ):open(startOpened) {}
private:
std::condition_variable cv;
mutable std::mutex m;
std::size_t queue_size = 0;
bool open = true;
auto lock() const { return std::unique_lock(m); }
};
主线程关门,等待队列大小为 1 以确保工作线程不工作。
工作线程在休眠 100ms 后执行SleepyUseDoor 尝试打开它。
当工作线程可以工作时,主线程才开门。
如果有大量工作线程和控制器线程,这将是低效的,因为我对队列和开门消息使用相同的 cv。所以一个会导致其他线程虚假地唤醒。使用一个工作线程和一个控制器线程,消息在任何程度上都不会是虚假的。
我只在队列大小增加和开门时通知,但我故意发出超过 1 个通知(如果有人在等待队列大小更改并且开门者吃掉它,那会很糟糕)。
实际上,您可能可以用两扇门来实现这一点。
struct Door {
// blocks until the door is open
void UseDoor() const {
auto l = lock();
cv.wait(l, [&]{ return open; });
}
// opens the door. Notifies blocked threads trying to use the door.
void OpenDoor() {
auto l = lock();
open = true;
cv.notify_all();
}
// closes the door.
void CloseDoor() {
auto l = lock();
open = false;
}
explicit Door(bool startOpen=true):open(startOpen) {}
private:
std::condition_variable cv;
mutable std::mutex m;
bool open = true;
auto lock() const { return std::unique_lock(m); }
};
工作线程这样做:
Door AmNotWorking(true);
Door CanWork(true);
void work() {
for(;;) {
canWork.UseDoor()
AmNotWorking.CloseDoor();
// work
AmNotWorking.OpenDoor();
std::this_thread::sleep_for(100ms);
}
}
控制器线程会:
void preventWork() {
CanWork.CloseDoor();
AmNotWorking.UseDoor();
}
void allowWork() {
CanWork.OpenDoor();
}
但我在那里看到了竞争条件;在CanWork.UseDoor() 和AmNotWorking.OpenDoor() 之间;有人可以关闭CanWork 门然后阅读AmNotWorking 门。我们需要它是原子的。
// Goes through the door when it is open.
// atomically runs the lambda passed in while the
// mutex is locked with checking the door state.
// WARNING: this can cause deadlocks if you do the
// wrong things in the lambda.
template<class F>
void UseDoor(F atomicWhenOpen) const {
auto l = lock();
cv.wait(l, [&]{ return open; });
atomicWhenOpen();
}
当我们成功使用门时,它会执行原子操作。有点危险,但工作线程现在可以:
void work() {
for(;;) {
canWork.UseDoor([]{AmNotWorking.CloseDoor();});
// work
AmNotWorking.OpenDoor();
std::this_thread::sleep_for(100ms);
}
}
这保证了我们将“AmNotWorking”门关闭在同一个锁中,因为我们验证了“CanWork”门是打开的。
void preventWork() {
CanWork.CloseDoor();
AmNotWorking.UseDoor();
}
如果“use can work and close am working”操作发生在CanWork.CloseDoor()之前,我们将无法AmNotWorking.UseDoor()直到工作线程完成他们的工作。
如果发生在CanWork.CloseDoor()之后,那么AmNotWorking.UseDoor()就关闭了,所以我们再次等到工作线程不工作。
我们不能 CanWork.CloseDoor() 在使用 can work 门和关闭 AmNotWorking 之间,这是额外的原子 lambda 回调给我们的。
我们可能可以制作一个不那么危险的原语,但我不确定如何优雅地完成它。
也许是一个简单的信号量?
template<class T = std::ptrdiff_t>
struct Semaphore {
void WaitUntilExactValue( T t ) const {
auto l = lock();
cv.wait( l, [&]{ return value==t; }
}
void WaitUntilAtLeastValue( T t ) const {
auto l = lock();
cv.wait( l, [&]{ return value>=t; }
}
void WaitUntilAtMostValue( T t ) const {
auto l = lock();
cv.wait( l, [&]{ return value<=t; }
}
void Increment() {
auto l = lock();
++value;
cv.notify_all();
}
void BoundedIncrement(T ceil) {
auto l = lock();
cv.wait(l, [&]{ return value+1 <= ceil; });
++value;
cv.notify_all();
}
void Decrement() {
auto l = lock();
--value;
cv.notify_all();
}
void BoundedDecrement(T floor) {
auto l = lock();
cv.wait(l, [&]{ return value-1 >= floor; });
--value;
cv.notify_all();
}
explicit Semaphore( T in = 0 ):value(std::forward<T>(in)) {}
private:
std::condition_variable cv;
mutable std::mutex m;
T value = 0;
auto lock() const; // see above
};
然后
Semaphore workLimit(1);
void work() {
for(;;) {
workLimit.BoundedDecrement(0);
// work
workLimit.Increment();
std::this_thread::sleep_for(100ms);
}
}
void preventWork() {
workLimit.Decrement();
workLimit.WaitUntilExactValue(0);
}
void allowWork() {
workLimit.Increment();
}
这里,workLimit 是此时允许有多少工人工作。开头是1。
当工人正在工作但不允许工作时,它是-1。当它工作并被允许时,它是0。当它处于睡眠状态并允许工作时,它是1。当它处于休眠状态时(或者因为它处于休眠状态,或者有界递减)并且不允许工作,它是0。