【问题标题】:How to handle a request/response stream in rxcpp如何在 rxcpp 中处理请求/响应流
【发布时间】:2017-07-11 10:56:25
【问题描述】:

我需要在 rxcpp 中实现一个摄像头采样系统。我想象的方式是将 requestStream 作为参数传递并接收 responseStream

每次调用 requestSample 时,都会创建一个新的摄像头会话,当为 requestStream 调用 on_complete() 时,摄像头会话就会终止

observable<ImageSample> requestSampleStream(observable<ImageRequest> requestStream$) {
  auto response$ = rxcpp::observable<>::create<ImageSample>(
  [&](rxcpp::subscriber<ImageSample> s){
    auto request_next = [&](ImageRequest imageRequest) {
      cout << "image request next" << endl;
      SampleImage vsi;
      s.on_next(vsi);
    };

    auto request_completed = [&] {
      cout << "no more samples needed.. close camera" << endl;
      s.on_completed();
    };

    auto request_error = [&](std::exception_ptr e) {
      try { rethrow_exception(e); }
      catch (const exception &ex) {
        cerr << "error happened on request stream.. close the camera and send error on return stream" << endl << ex.what() << endl;
      }

      s.on_error(e);
    };

    requestStream$.subscribe(request_next,
                              request_error,
                              request_completed
    );
  });

  return response$;
}

这里的问题是,在调用 requestStream$.subscribe(request_next.. 时,我得到了一个 EXC_BAD_ACCESS

【问题讨论】:

    标签: c++ c++11 reactive-programming rxcpp


    【解决方案1】:

    s 是一个本地堆栈变量,但 lambdas 正在捕获对s 的引用。在调用 lambda 时,此引用无效。将[&amp;] 更改为[=],这应该可以工作!

    另一种选择是使用现有的map 算法。

    observable<ImageSample> requestSampleStream(observable<ImageRequest> requestStream$) {
        return requestStream$ |
            map([](ImageRequest imageRequest){
                cout << "image request next" << endl;
                SampleImage vsi;
                s.on_next(vsi);
            })
            // optional
            | tap([&](std::exception_ptr e) {
                try { rethrow_exception(e); }
                catch (const exception &ex) {
                    cerr << "error happened on request stream.. close the camera and send error on return stream" << endl << ex.what() << endl;
                }
            },
            [](){
                cout << "no more samples needed.. close camera" << endl;
            })
            ;
    }
    

    【讨论】:

    • 现货.. rxcpp-ish 的方法是什么?通过地图+点击?
    • 是的,使用现有的地图算法比使用create重现地图要好。
    • 我唯一的问题是map的结果是异步的,所以map不能同步返回..有解决办法吗?
    • 通常flat_map用于异步结果。
    • 'flat_map' 只是 'map() | merge()' 其中地图返回一个 'observable'
    猜你喜欢
    • 2011-02-13
    • 2021-12-02
    • 2021-10-12
    • 2020-04-12
    • 2013-06-19
    • 1970-01-01
    • 2016-12-27
    • 2013-06-22
    • 2015-04-26
    相关资源
    最近更新 更多