【问题标题】:How do I rate limit requests losslessly using RxJS 5如何使用 RxJS 5 无损地对限制请求进行评级
【发布时间】:2017-02-16 20:31:44
【问题描述】:

我想向服务器发出一系列请求,但服务器的硬速率限制为每秒 10 个请求。如果我尝试循环发出请求,它将达到速率限制,因为所有请求将同时发生。

for(let i = 0; i < 20; i++) {
  sendRequest();
}

ReactiveX 有很多用于修改可观察流的工具,但我似乎找不到实现速率限制的工具。我尝试添加标准延迟,但请求仍然同时触发,仅比之前晚了 100 毫秒。

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);

debounceTimethrottleTime 运算符也与我正在寻找的类似,但这是有损而不是无损的。我想保留我提出的每个请求,而不是丢弃之前的请求。

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...

如何在不超过使用 ReactiveX 和 Observables 的速率限制的情况下向服务器发出这些请求?

【问题讨论】:

    标签: javascript rxjs reactive-programming


    【解决方案1】:

    OP 的self answer(和linked blog)中的实现总是会产生不理想的延迟。

    如果限速服务允许每秒 10 个请求,则应该可以在 10 毫秒内发出 10 个请求,只要在另外 990 毫秒内没有发出下一个请求。

    下面的实现应用了可变延迟以确保强制执行限制,并且延迟仅适用于超出限制的请求。

    function rateLimit(source, count, period) {
    
      return source
        .scan((records, value) => {
    
          const now = Date.now();
          const since = now - period;
    
          // Keep a record of all values received within the last period.
    
          records = records.filter((record) => record.until > since);
          if (records.length >= count) {
    
            // until is the time until which the value should be delayed.
    
            const firstRecord = records[0];
            const lastRecord = records[records.length - 1];
            const until = firstRecord.until + (period * Math.floor(records.length / count));
    
            // concatMap is used below to guarantee the values are emitted
            // in the same order in which they are received, so the delays
            // are cumulative. That means the actual delay is the difference
            // between the until times.
    
            records.push({
              delay: (lastRecord.until < now) ?
                (until - now) :
                (until - lastRecord.until),
              until,
              value
            });
          } else {
            records.push({
              delay: 0,
              until: now,
              value
            });
          }
          return records;
    
        }, [])
        .concatMap((records) => {
    
          const lastRecord = records[records.length - 1];
          const observable = Rx.Observable.of(lastRecord.value);
          return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
        });
    }
    
    const start = Date.now();
    rateLimit(
      Rx.Observable.range(1, 30),
      10,
      1000
    ).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
    &lt;script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"&gt;&lt;/script&gt;

    【讨论】:

    • 仅供参考,我已经更新了这个答案。 until 计算不是最优的;它应该基于第一条记录——而不是now
    【解决方案2】:

    This blog post 很好地解释了 RxJS 擅长丢弃事件,以及它们是如何得出答案的,但最终,您正在寻找的代码是:

    queueRequest$
      .concatMap(queueData => Rx.Observable.of(queueData).delay(100))
      .subscribe(() => {
        sendRequest();
      });
    

    concatMap 将新创建的 observable 添加到 observable 流的后面。此外,使用delay 将事件推回 100 毫秒,允许每秒发生 10 个请求。 You can view the full JSBin here, which logs to the console instead of firing requests.

    【讨论】:

      【解决方案3】:

      我编写了一个库来执行此操作,您设置了每个间隔的最大请求数,并通过延迟订阅来限制 observables。它已经过测试并带有示例:https://github.com/ohjames/rxjs-ratelimiter

      【讨论】:

        【解决方案4】:

        实际上,使用bufferTime() 运算符及其三个参数有一种更简单的方法:

        bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize)
        

        这意味着我们可以使用bufferTime(1000, null, 10),这意味着我们将在最多 1 秒后发出最多 10 个项目的缓冲区。 null 表示我们想在当前缓冲区发出后立即打开一个新缓冲区。

        function mockRequest(val) {
          return Observable
            .of(val)
            .delay(100)
            .map(val => 'R' + val);
        }
        
        Observable
          .range(0, 55)
          .concatMap(val => Observable.of(val)
            .delay(25) // async source of values
            // .delay(175)
          )
        
          .bufferTime(1000, null, 10) // collect all items for 1s
        
          .concatMap(buffer => Observable
            .from(buffer) // make requests
            .delay(1000)  // delay this batch by 1s (rate-limit)
            .mergeMap(value => mockRequest(value)) // collect results regardless their initial order
            .toArray()
          )
          // .timestamp()
          .subscribe(val => console.log(val));
        

        观看现场演示:https://jsbin.com/mijepam/19/edit?js,console

        您可以尝试不同的初始延迟。只有25ms的请求会在10之前分批发送:

        [ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ]
        [ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ]
        [ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ]
        [ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ]
        [ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
        [ 'R50', 'R51', 'R52', 'R53', 'R54' ]
        

        但是对于.delay(175),我们将发出少于 10 个项目的批次,因为我们受到 1 秒延迟的限制。

        [ 'R0', 'R1', 'R2', 'R3', 'R4' ]
        [ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ]
        [ 'R11', 'R12', 'R13', 'R14', 'R15' ]
        [ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ]
        [ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ]
        [ 'R28', 'R29', 'R30', 'R31', 'R32' ]
        [ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ]
        [ 'R39', 'R40', 'R41', 'R42', 'R43' ]
        [ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
        [ 'R50', 'R51', 'R52', 'R53', 'R54' ]
        

        但是,您可能需要的内容有所不同。由于.bufferTime(1000, ...)delay(1000),此解决方案最初在 2 秒延迟后开始发出值。所有其他排放都在 1 秒后发生。

        你最终可以使用:

        .bufferTime(1000, null, 10)
        .mergeAll()
        .bufferCount(10)
        

        这将始终收集 10 个项目,然后才会执行请求。这可能会更有效。

        【讨论】:

          【解决方案5】:

          使用Adamanswer。但是,请记住,传统的of().delay() 实际上会在每个元素之前添加延迟。特别是,这会延迟 observable 的第一个元素,以及实际上没有速率限制的任何元素。

          解决方案

          您可以通过让您的 concatMap 返回一个 立即 发出 a 值但仅在给定延迟后完成的 observable 流来解决此问题:

          new Observable(sub => {
            sub.next(v);
            setTimeout(() => sub.complete(), delay);
          })
          

          这有点拗口,所以我会为它创建一个函数。也就是说,由于在实际速率限制之外没有任何用处,所以最好只写一个 rateLimit 运算符:

          function rateLimit<T>(
              delay: number,
              scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
            return concatMap(v => new Observable(sub => {
              sub.next(v);
              scheduler.schedule(() => sub.complete(), delay);
            }));
          }
          

          然后:

          queueRequest$.pipe(
              rateLimit(100),
            ).subscribe(...);
          

          限制

          这将在每个元素之后创建一个延迟。这意味着如果你的源 observable 发出它的最后一个值然后完成,那么你得到的速率受限的 observable 在它的最后一个值和完成之间会有一点延迟。

          【讨论】:

          • 希望有在 strictNullChecks 开启时编译的版本。
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-06-13
          相关资源
          最近更新 更多