【问题标题】:c++ lambda callback to trigger eventc++ lambda回调触发事件
【发布时间】:2017-07-19 20:57:34
【问题描述】:

我一直在尝试围绕 C++ 中的回调功能展开思考。我想要达到的目标如下:

我有两个对象,每个对象都有自己的线程。一个对象A 有一个指向第二个对象B 的指针。见例子:

class A
{
  public:
   // ...
  private:
   std::unique_ptr<B> b;
};

class B
{
  public:
   void add_message(MessageType msg);
   // ...
};

我想要实现的是让对象A 使用指向B 的指针添加一条消息,然后继续做其他事情,但是有一个回调或处理程序或当B 有回复时触发的东西到那个消息。 B 对消息进行一些处理,并可能将其传递给其他对象以在其自己的线程上进行处理,但最终会得到答复。那我怎么知道B什么时候回复了我的消息,例如:

// In class A
MessageType m();
b->add_message(m)
// A's thread continues doing other stuff
...
// some notification that b has a reply?

我知道我可能必须将 std::function 用于我想使用的回调,但是通过查看大量示例,我无法弄清楚如何做到这一点。感谢您提供任何帮助,并请注意,我查看了很多示例,但无法将其与我想要实现或不理解的目标联系起来......

【问题讨论】:

  • 您可能不希望线程能够相互访问。处理此类事情的常用方法是使用消息队列。一个线程输入消息,另一个线程输出消息。队列可以处理同步细节。 ZeroMQ 似乎是当今此类事物的流行库。
  • @Vincent 我已经在Bs 类中使用了一个队列。 B 拥有队列,A 通过 add_message(MessageType msg); 方法将消息添加到队列中。我迷路的地方是A 期待回复,那么它怎么知道什么时候有……?
  • 您需要一个互斥锁或其他一些同步机制来防止您的线程相互破坏内存。您不能在没有同步的情况下从不同线程读取/写入相同的内存(变量)。
  • 我不知道您将能够在消息处理过程中添加消息(除非您添加更多线程),这使得该方法主要是串行的。

标签: c++ c++11 lambda callback


【解决方案1】:

线程是执行序列。它们的行为大致类似于线性 C++ 程序,嵌入在一个内存模型中,使它们能够通信并注意到由其他执行线程引起的状态变化。

如果没有线程的合作,对线程的回调不能接管执行序列。您要通知的线程必须明确检查消息是否已到达并进行处理。


有两种常见的方式来处理消息的响应。

第一个是std::futurelike 方法。在其中,调用者获得某种令牌,该令牌代表将来可能或将要产生的答案。

第二个是再次使用消息传递。您向 B 发送消息请求响应。 B 将包含响应的消息发送回 A。与 B 接收消息的方式相同,A 也接收消息。该消息可能包含某种“返回目标”以帮助 A 将其链接到原始消息。

在基于消息的系统中,通常有一个“事件循环”。您有一个重复返回“事件循环”的线程,而不是一个大型的线性程序。它在那里检查消息队列,如果没有则等待一些消息。

在这样的系统下,任务必须被分解成小块,以便您经常检查事件循环以做出响应。

实现这一点的一种方法是使用协程,这是一种不拥有自己的执行程序的执行状态(就像一个线程,它拥有两者)。协程会定期放弃优先级并“保存其状态以备后用”。


未来的解决方案通常是最简单的,但它依赖于 A 定期检查响应。

首先,threaded_queue&lt;T&gt;,它允许任意数量的生产者和消费者将事物传递到队列中并从前面吃掉它们:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};

现在,我们要将任务传递到这样的队列中,并让传递任务的人返回结果。这是将上述内容与打包任务一起使用:

template<class...Args>
struct threaded_task_queue {
  threaded_task_queue() = default;
  threaded_task_queue( threaded_task_queue&& ) = delete;
  threaded_task_queue& operator=( threaded_task_queue&& ) = delete;
  ~threaded_task_queue() = default;
  template<class F, class R=std::result_of_t<F&(Args...)>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R(Args...)> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  void terminate() {
    tasks.terminate();
  }
  std::function<void(Args...)> pop_task() {
    auto task = tasks.pop_front();
    if (!task) return {};
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
    return [task_ptr](Args...args){
      (*task_ptr)(std::forward<Args>(args)...);
    };
  }
private:
  threaded_queue<std::packaged_task<void(Args...)>> tasks;
};

如果我做对了,它会像这样工作:

  • A 以 lambda 的形式向 B 发送任务队列。这个 lambda 接受一些固定的参数集(由 B 提供),并返回一些值。

  • B 弹出队列,并获得一个接受参数的std::function。它调用它;它在 B 的上下文中返回 void

  • A 在排队任务时获得了future&lt;R&gt;。它可以查询它以查看它是否完成。

你会注意到 A 不能被“通知”事情已经完成。这需要不同的解决方案。但是,如果 A 最终到达了不等待 B 的结果就无法前进的地步,那么这个系统就可以工作了。

另一方面,如果 A 积累了大量此类消息,并且有时需要等待许多此类 B 的输入,直到其中任何一个返回数据(或用户执行某项操作),那么您需要比 a 更高级的东西std::future&lt;R&gt;。一般模式——拥有一个代表未来要交付的计算的令牌——是可靠的。但是您需要对其进行扩充,以使其与未来计算和消息循环等的多个来源很好地配合。

代码未经测试。

threaded_task_queue 发送消息时的一种方法是:

template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
  threaded_task_queue< std::function<R(Args...)> >
{
  std::future<R> queue_message(Args...args) {
    return this->queue_task(
      [tup = std::make_tuple(std::forward<Args>(args)...)]
      ( std::function<R(Args...)> f ) mutable
      {
        return std::apply( f, std::move(tup) );
      }
    );
  }
  bool consume_message( std::function<R(Args...)> f )
  {
    auto task = pop_task();
    if (!task) return false;
    task( std::move(f) );
    return true;
  }
};

在提供者端,您提供Args...,在消费者端,您使用Args... 并返回R,在提供者端,您有一个future&lt;R&gt;,一旦消费者是完成。

这可能比我写的原始@​​987654335@ 更自然。

std::apply 是 C++17,但也有 C++11 和 C++14 的实现。

【讨论】:

  • cv.notify_one() 保留在函数'threaded_queue::push_back` 中的原因是什么?
  • @Steephen notify_one 不需要您持有互斥锁。所以我没有。在某些实现中,这可能会有所帮助,因为监听线程不会被通知唤醒,然后阻止立即获取互斥锁。
  • 知道了。非常感谢您的澄清。
猜你喜欢
  • 2019-07-17
  • 1970-01-01
  • 1970-01-01
  • 2018-03-17
  • 2015-10-29
  • 2018-05-18
  • 1970-01-01
  • 1970-01-01
  • 2012-10-01
相关资源
最近更新 更多