【问题标题】:Synchronous stream of responses from a stream of requests with RxJS来自 RxJS 的请求流的同步响应流
【发布时间】:2016-02-03 20:27:58
【问题描述】:

我是 RxJS 的新手,想知道是否有人可以帮助我。

我想从请求流(有效负载数据)创建一个同步响应流(最好是相应的请求)。

我基本上希望请求一个一个发送,每个都等待最后一个的响应。

我试过这个,但它会一次发送所有内容(jsbin):

var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};

以下方法在一定程度上有效,但不使用流作为请求数据 (jsbin)。

var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};

谢谢!

编辑:

澄清一下,这就是我想要实现的目标:

“发送 A,当你收到 A 的响应时,发送 B,当你收到 B 的响应时,发送 C,等等……”

按照 user3743222 的建议,使用 concatMap 和 defer 似乎可以做到(jsbin):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);

【问题讨论】:

    标签: javascript reactive-programming rxjs


    【解决方案1】:

    尝试在您的第一个代码示例中将flatMap 替换为concatMap,如果结果行为符合您的要求,请告诉我。

    responseStream = requestStream.concatMap(//I replaced `flatMap`
      sendRequest,
      (val, response)=>{ return {val, response}; }
    );
    

    基本上concatMapflatMap 具有相似的签名,不同之处在于它将等待当前的可观察对象被展平完成,然后再继续下一个。所以在这里:

    • requestStream 值将被推送到 concatMap 运算符。
    • concatMap 运算符将生成一个sendRequest 可观察对象,并且该可观察对象中的任何值(似乎是一个元组(val, response))都将通过选择器函数传递,其对象结果将传递给下游
    • sendRequest 完成时,会处理另一个 requestStream 值。
    • 总之,你的请求会被一一处理

    或者,也许您想使用defer 来推迟sendRequest 的执行。

    responseStream = requestStream.concatMap(//I replaced `flatMap`
      function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
      (val, response)=>{ return {val, response}; }
    );
    

    【讨论】:

    • 感谢您的回复。我尝试了您的解决方案,但请求仍然全部立即发送。该文档表明 flatMap 可能会导致交错,而 concatMap 不会。似乎区别在于排序。使用 concatMap 是有意义的,但它仍然不会产生所需的行为:发送 A,当您收到 A 的响应时,发送 B,当您收到 B 的响应时,发送 C,等等。
    • 也许我误解了你想要的。你能在那种情况下试试defer吗?我会更新代码
    • @user3743222 我们怎样才能在发出网络请求之前发出一个类似“正在加载 url..”的字符串?
    猜你喜欢
    • 1970-01-01
    • 2020-03-16
    • 2014-07-07
    • 2016-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-26
    相关资源
    最近更新 更多