【问题标题】:Rate limiting http calls made by rxjs限制 rxjs 进行的 http 调用
【发布时间】:2016-04-29 14:13:22
【问题描述】:

我正在编写一项服务,人们可以在其中粘贴来自 Spotify 播放列表的网址,然后将播放列表导出到不同的服务中。对于粘贴在请求中的每个曲目 url,需要向 Spotify api 发出请求。

这段代码:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );
  1. 从 ITrackIdentifiers 列表创建一个 observable
  2. 采用轨道标识符的 id 来创建一个可观察的字符串 (id)
  3. 删除列表中的所有重复 ID
  4. 为每个对 spotify 进行的 http 调用创建一个 observable(并捕获错误)
  5. 使用平面图将所有这些可观察对象的结果合并到一个流中

这实际上工作正常,除非添加大量曲目。我的一个示例播放列表有超过 500 首曲目,因此立即进行了 500 次调用,浏览器需要处理它们/从缓存中返回项目,因此浏览器运行缓慢并锁定,并且当我超过 api 调用限制时,spotify 返回大量错误.

我希望只能同时运行 10 个呼叫。 Merge with maxConcurrent set 似乎是Stackoverflow 上讨论的完美解决方案。

看起来像这样:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .map(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                    catch( ( error ) => this.handleError( error ) ))
            .merge(10)
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

但它只是不起作用。在 chrome 网络调试器中,您可以看到同时进行的所有调用,并且大多数调用会排队很长时间,直到它们失败。

为什么这不起作用?我还能如何解决这个问题?

这是现阶段项目的Github checkin

【问题讨论】:

  • Merge 可以正常工作。它限制了订阅的数量。但是在“合并”之前,您有一个“地图”,它实际上会发出所有请求,然后才开始“合并”。

标签: javascript typescript rxjs


【解决方案1】:

使用merge 的代码的问题在于spotifyService.lookupTrack 不返回Observable,而是返回Promise。一些Observables 函数如flatMaphandle Promises as well,但ObservablePromise 之间的区别在于Observable 是惰性的,而Promise 不是。正如 user3743222 所建议的那样,您可以使用 Observable.defer 从 promise 工厂函数中创建一个惰性 observable。这个小示例使用 JavaScript 而不是 TypeScript,因此可以在此处运行。

console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p);  window.scrollTo(0, b.scrollHeight); };

function log_delay(timeout, value) {
  return new Promise(resolve => {
    console.log('Start: ' + value);
    setTimeout(() => {
      console.log('End: ' + value);
      resolve(value);
    }, timeout);
  });
}

Rx.Observable.range(0, 6)
.map(x => Rx.Observable.defer(
  () => log_delay(1000, x)
  .catch(e => console.log('Inner catch'))
))
.merge(2)
.subscribe(
  s => console.log('Result: ' + s),
  s => console.log('Error: ' + s),
  s => console.log('Complete')
);
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"&gt;&lt;/script&gt;

【讨论】:

【解决方案2】:

我设法让它按我想要的方式工作,但我仍然很好奇为什么合并不起作用。 这里构建了唯一 id 的列表,然后我们使用 concatMap 为每个 id 创建一个 Observable,然后等待延迟,然后再移动到下一个项目:

        Rx.Observable.fromArray<ITrackIdentifier>( this._allTracks )
            .pluck<string>( "id" )
            .distinct()
            .concatMap( ( id, index ) => Rx.Observable.interval( 50 ).take( 1 ).map( () => { return id } ) )
            .flatMap(
                ( trackId ) => this.spotifyService.lookupTrack( trackId ).
                catch( ( error ) => this.handleError( error ) ))
            .subscribe(
                ( result ) => this.handleTrackLookupResult( result ),
                ( error ) => this.handleError( error ),
                () => this.handleComplete()
            );

在此示例中,我在每次调用之间等待 50 毫秒。这大大减少了错误。

这是现阶段项目的Github checkin

【讨论】:

  • 您可以只使用() =&gt; id 而不是() =&gt; { return id; }
  • 另外,Rx.Observable.just(id).delay(50) 也会这样做
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-06
  • 1970-01-01
  • 1970-01-01
  • 2021-05-04
  • 2017-06-13
相关资源
最近更新 更多