【问题标题】:RXJS how to delay each observable (http request) and combine all requests outputRXJS 如何延迟每个 observable(http 请求)并合并所有请求输出
【发布时间】:2019-09-28 05:33:17
【问题描述】:

我在一个 observable 中设置 http 请求之间的间隔时遇到问题。 事情是 - 我想在每个 http 请求之间有延迟,等待所有请求完成,并对来自所有请求的组合数据执行操作。 当然文档数组的长度是未知的。

示例代码:

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';

  return of(document); // in real world http.post()
}

const documentsToSave$ = zip(
  documents,
  interval(200),
  document => {
    document['someDataToBeInserted'] = {'data': 123};
    return saveDocumentService(document);
  }
);

const sub = forkJoin(documentsToSave$).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});

使用这种方法,只输出最后一个值。

谢谢。

【问题讨论】:

    标签: angular rxjs observable rxjs6


    【解决方案1】:

    您可以将merge 与计时器一起使用并忽略所述计时器的输出:

    import { of, from, timer } from 'rxjs'
    import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'
    
    const documents = [
      {
        'documentId': 1,
        'documentName': 'DocName1'
      },
      {
        'documentId': 2,
        'documentName': 'DocName2'
      },
    ];
    
    function saveDocumentService(document) {
      document['someAnotherData'] = '123';
      return of(document)
    }
    from(documents)
      .pipe(
        concatMap(url => saveDocumentService(url).pipe(
          tap(res => console.log('Saved document...')),
          merge(timer(1000).pipe(ignoreElements()))
        )),
        toArray(),
      )
      .subscribe(documents => {
        console.log('Sub:', documents)
      })
    

    Stackblitz

    【讨论】:

      【解决方案2】:

      由于您有 Observable<Observable<any>> 类型的 documentsToSave$,将 forkJoin 替换为 mergeAlltoArray

      const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
        console.log('All documents uploaded', documents); // array of responses
      });
        }
      

      here is working example

      【讨论】:

        【解决方案3】:

        您可以使用rxjs pipe 运算符将takeinterval 组合为

        counter: number = 0;
        items: string[] = ["one", "tow", "three", "four"];
        
          ngOnInit() {
            interval(2000)
              .pipe(take(this.items.length))
              .subscribe(res => {
                console.log(console.log(this.items[this.counter++]));
              });
          }
        

        编辑:您需要使用Observable.create 创建一个 Observable 流,您可以在其中使用setInterval 定期创建 observables,最后,您可以将其标记为完成。 p>

        订阅后,一旦发出新值和可观察对象标记为完成,我们将执行单独的方法

        obs: Observable<any>;
        counter: number = 0;
        items: string[] = ["one", "two", "three", "four"];
        
        
        
        ngOnInit() {
            this.obs = Observable.create(observer => {
              let intervalID = setInterval(() => {
                observer.next(this.items[this.counter++]);
                if (this.counter >= this.items.length) {
                  clearInterval(intervalID);
                  observer.complete();
                }
              }, 1000);
            });
        
            this.obs.subscribe(
              res => {
                console.log(res);
              },
              err => {
                console.log(`Error: ${err}`);
              },
              () => {
                console.log("complete");
              }
            );
          }
        

        Stackblitz:https://stackblitz.com/edit/angular-regular-interval-observables-with-complete

        【讨论】:

        • 使用您的方法,每次items 元素被触发时,订阅都会输出。我想等待所有请求完成以在订阅中处理它们。
        • @Klepstryk 请查看我的更新答案。希望对你有帮助
        • 好的,在您更新的代码中,订阅仍然会触发每个发送的项目,完成时当然会触发一次,但不会合并所有响应。
        【解决方案4】:

        尝试使用combineLatest。但请注意,在每个 observable 发出至少一个值之前, combineLatest 不会发出初始值。您可以查看文档以获取更多信息: combineLatest

        【讨论】:

        • 再次 - 使用您的方法,每次触发 items 元素时,subscribe 都会输出。我想等待所有请求完成以在订阅中处理它们。
        猜你喜欢
        • 2017-08-12
        • 2019-02-13
        • 1970-01-01
        • 2022-01-25
        • 2022-11-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多