【问题标题】:RxJs indefinite observable callsRxJs 无限期可观察调用
【发布时间】:2019-07-15 10:46:55
【问题描述】:

我需要从服务器检索无限量的数据。这应该通过以下方式进行:

  1. 发送初始请求
  2. 检索部分数据并告诉服务器一切正常,我可以获取更多数据
  3. 重复第 2 步和第 3 步,直到我收到一个表示没有更多数据的特定值

如何使用 observables 做到这一点?

现在我只能想到一些类似函数的递归可观察调用。

const send = execSend() {
    this.send(message).subscribe(resp => {
        if (resp === 'end') {
            subscriber.next(byteArr.join(''));
            console.log('finished');
            subscriber.complete();
        } else {
            byteArr.push(resp);
            execSend();
        }
    });
}();

【问题讨论】:

  • 你想每次都发送相同的请求吗?
  • 是的。这只是告诉服务器我得到了我的价值并为下一个做好了准备。更准确地说,所有这些都是为了在收到所有内容时逐块检索大量数据以将其全部合并。由于使用的是 WebSocket,因此我们受限于数据大小限制。

标签: angular typescript rxjs angular2-observables


【解决方案1】:

类似这样的:

let todo = true;
interval(100).pipe(
    takeWhile(()=>todo),
    concatMap(()=>getStuff())
).subscribe(data => {
  todo = !isFinished(data);
});

【讨论】:

    【解决方案2】:

    未测试,但您可以尝试此模式

    exec=()=>http.get(....) 
    
    exec().pipe(
      expand((resp)=>exec()),
      takeWhile(resp=>resp !== 'end'),
      scan((acc,curr)=>acc.concat(curr),[])
    ).subscribe()
    

    【讨论】:

      【解决方案3】:

      以下应该复制您的想法:完成前发出 1 个事件。

      import { of } from 'rxjs';
      import { expand, takeWhile, reduce } from 'rxjs/operators';
      
      let count = 0;
      const FINISH = "finished";
      const limit = 5;
      const send$ = () => of(count++ < limit ? "sent" : FINISH);
      
      const expander$ = send$().pipe(
        expand(resp => send$()),
        takeWhile(resp => resp !== FINISH),
        reduce((acc, val) => acc ? acc + val : val, null)
      );
      
      const subscribe = expander$.subscribe(console.log);
      

      你可以在this blitz看到它工作

      【讨论】:

      • 如果我的可观察(发送)在传递一个值后立即完成,这不会崩溃吗?
      • 你能说得更具体点吗?
      • 我的 observable 发出值并在此之后立即完成订阅。
      • 这个例子也是这样做的,send$ 创建了一个 observable,它发出一个值并完成......
      • 谢谢,我会努力的。为什么send$ 是一个函数?不仅仅是send$ = of(...)
      猜你喜欢
      • 1970-01-01
      • 2019-10-18
      • 2016-05-17
      • 2017-06-11
      • 2016-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-05
      相关资源
      最近更新 更多