线程是执行序列。它们的行为大致类似于线性 C++ 程序,嵌入在一个内存模型中,使它们能够通信并注意到由其他执行线程引起的状态变化。
如果没有线程的合作,对线程的回调不能接管执行序列。您要通知的线程必须明确检查消息是否已到达并进行处理。
有两种常见的方式来处理消息的响应。
第一个是std::futurelike 方法。在其中,调用者获得某种令牌,该令牌代表将来可能或将要产生的答案。
第二个是再次使用消息传递。您向 B 发送消息请求响应。 B 将包含响应的消息发送回 A。与 B 接收消息的方式相同,A 也接收消息。该消息可能包含某种“返回目标”以帮助 A 将其链接到原始消息。
在基于消息的系统中,通常有一个“事件循环”。您有一个重复返回“事件循环”的线程,而不是一个大型的线性程序。它在那里检查消息队列,如果没有则等待一些消息。
在这样的系统下,任务必须被分解成小块,以便您经常检查事件循环以做出响应。
实现这一点的一种方法是使用协程,这是一种不拥有自己的执行程序的执行状态(就像一个线程,它拥有两者)。协程会定期放弃优先级并“保存其状态以备后用”。
未来的解决方案通常是最简单的,但它依赖于 A 定期检查响应。
首先,threaded_queue<T>,它允许任意数量的生产者和消费者将事物传递到队列中并从前面吃掉它们:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back( T t ) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); } );
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
现在,我们要将任务传递到这样的队列中,并让传递任务的人返回结果。这是将上述内容与打包任务一起使用:
template<class...Args>
struct threaded_task_queue {
threaded_task_queue() = default;
threaded_task_queue( threaded_task_queue&& ) = delete;
threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
~threaded_task_queue() = default;
template<class F, class R=std::result_of_t<F&(Args...)>>
std::future<R> queue_task( F task ) {
std::packaged_task<R(Args...)> p(std::move(task));
auto r = p.get_future();
tasks.push_back( std::move(p) );
return r;
}
void terminate() {
tasks.terminate();
}
std::function<void(Args...)> pop_task() {
auto task = tasks.pop_front();
if (!task) return {};
auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
return [task_ptr](Args...args){
(*task_ptr)(std::forward<Args>(args)...);
};
}
private:
threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
如果我做对了,它会像这样工作:
A 以 lambda 的形式向 B 发送任务队列。这个 lambda 接受一些固定的参数集(由 B 提供),并返回一些值。
B 弹出队列,并获得一个接受参数的std::function。它调用它;它在 B 的上下文中返回 void。
A 在排队任务时获得了future<R>。它可以查询它以查看它是否完成。
你会注意到 A 不能被“通知”事情已经完成。这需要不同的解决方案。但是,如果 A 最终到达了不等待 B 的结果就无法前进的地步,那么这个系统就可以工作了。
另一方面,如果 A 积累了大量此类消息,并且有时需要等待许多此类 B 的输入,直到其中任何一个返回数据(或用户执行某项操作),那么您需要比 a 更高级的东西std::future<R>。一般模式——拥有一个代表未来要交付的计算的令牌——是可靠的。但是您需要对其进行扩充,以使其与未来计算和消息循环等的多个来源很好地配合。
代码未经测试。
threaded_task_queue 发送消息时的一种方法是:
template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
threaded_task_queue< std::function<R(Args...)> >
{
std::future<R> queue_message(Args...args) {
return this->queue_task(
[tup = std::make_tuple(std::forward<Args>(args)...)]
( std::function<R(Args...)> f ) mutable
{
return std::apply( f, std::move(tup) );
}
);
}
bool consume_message( std::function<R(Args...)> f )
{
auto task = pop_task();
if (!task) return false;
task( std::move(f) );
return true;
}
};
在提供者端,您提供Args...,在消费者端,您使用Args... 并返回R,在提供者端,您有一个future<R>,一旦消费者是完成。
这可能比我写的原始@987654335@ 更自然。
std::apply 是 C++17,但也有 C++11 和 C++14 的实现。