【问题标题】:How to restrict number of API calls per interval with RxJs?如何使用 RxJs 限制每个间隔的 API 调用次数?
【发布时间】:2018-10-17 05:32:09
【问题描述】:

我正在开发一个 Angular 应用程序。而且我使用其中一个社交网络提供的 API,它的限制是每秒 5 次 API 调用。

最直接的解决方案是编写自定义逻辑来计算请求并将它们排队到限制。因此,如果我在一秒内向 API 发送第 6 个请求,它将在第一个请求发送后的第二个发送。

但如果可以使用 RxJs,我想找到一些优雅的解决方案。

例如,我可以为 Observable 设置 debounseTime,如下例所示。但我实际得到的是,我无法在行中以小于 200 毫秒的间隔发出几个请求。

this.searchControl.valueChanges
    .debounceTime(200) // 200ms ~ 5 requests per second 
    .switchMap(search => this.api.searchPeople(search))

RxJ 是否有任何技术可以限制每个间隔的发射次数并将它们排队以防请求发送过于频繁?

【问题讨论】:

    标签: angular typescript rxjs


    【解决方案1】:

    如果您对更多 Angular 方法感兴趣,这里是 Angular 6 和 RxJS 6 的拦截器(基于原始答案):

    @Injectable()
    export class RequestRateLimitInterceptor implements HttpInterceptor {
    
      private readonly timeFrame = 1000 /* 1 second time frame */;
    
      private availableThreads = 10 /* 10 requests per the time frame */;
    
      private rateLimit$ = new BehaviorSubject(this.availableThreads);
    
      intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
        return this.rateLimit$.pipe(
          filter(() => this.availableThreads > 0),
          take(1),
          tap(() => this.removeThreads()),
          tap(() => timer(this.timeFrame).subscribe(() => this.addThreads())),
          mergeMap(() => next.handle(req))
        );
      }
    
      private addThreads(): void {
        this.changeThreads(1);
      }
    
      private removeThreads(): void {
        this.changeThreads(-1);
      }
    
      private changeThreads(value: number): void {
        this.rateLimit$.next(this.availableThreads += value);
      }
    }
    

    【讨论】:

      【解决方案2】:

      您可以跟踪最近调用该 api 的次数。因此,如果您每秒可以进行 5 次调用,则意味着您有 5 个令牌,如果一个令牌被消耗,那么它将在 1 秒后更新。我制作了以下运算符来满足您的需要:

      Observable.prototype.rateLimit = function (count: number, slidingWindowTime: number, scheduler = async) {
        let tokens = count;
        const tokenChanged = new BehaviorSubject(tokens);
        const consumeToken = () => tokenChanged.next(--tokens);
        const renewToken = () => tokenChanged.next(++tokens);
        const availableTokens = tokenChanged.filter(() => tokens > 0);
      
        return this.mergeMap(value =>
          availableTokens
          .take(1)
          .map(() => {
            consumeToken();
            Observable.timer(slidingWindowTime, scheduler).subscribe(renewToken);
            return value;
          }));
      }
      
      declare module 'rxjs/Observable' {
        interface Observable < T > {
          rateLimit(count: number, slidingWindowTime: number, scheduler ? : Scheduler): Observable < T >
        }
      }

      【讨论】:

      • 太棒了。非常感谢,这正是我想要的。
      猜你喜欢
      • 2017-07-04
      • 2021-12-30
      • 1970-01-01
      • 1970-01-01
      • 2017-03-13
      • 1970-01-01
      • 2021-01-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多