【问题标题】:Rxjs buffer when source emitions rate is fast源发射速率快时的 Rxjs 缓冲区
【发布时间】:2018-04-10 11:58:30
【问题描述】:

假设我有一个可观察的以每秒 x0 次发射(可能是 50、60...)的速度发射,有时它只是每秒 1 或 2 次发射。

现在我怎样才能缓冲那些快速发射并仍然处理慢速发射。

我累了什么:

BufferTime 需要一个时间跨度,因此即使一次发射它也会被缓冲,(加上 BufferTime 会使量角器测试超时)。

BufferCount(x) 在收到所有 x 个发射之前不会发射。

【问题讨论】:

  • 你是在描述这个:保持一个发射 X 时间,如果另一个发射在 X 到期之前进入,那么缓冲它们。缓冲到 Y 的最大时间或直到发射没有在 X 之前出现(以先到者为准)。因此,慢发射会延迟 X,而快速发射会延迟 X 和 Y。基本上是用于缓冲的 debounce 运算符。
  • 我不明白问题出在哪里
  • @martin as bygrace 说类似 debounce 但用于缓冲(即,将值作为数组返回而不是忽略它们)
  • 听起来你可以使用bufferTime(time, null, size)。这最多缓冲size 项或任何较少数量的项目,最多为time
  • 或者你可以使用source.buffer(source.debounceTime(X)),这样它就会缓冲直到执行去抖动。如果您能提供一个通过流的消息的实际示例以及它们的时间以及您对如何缓冲它们的期望,这将有所帮助。

标签: rxjs rxjs5


【解决方案1】:

听起来你想要类似于 debounce + buffer 的东西。最简单的实现是使用流的去抖动来触发发射同一流的缓冲区。您可能希望共享流以防止重复订阅。这是一个运行示例:

const source = new Rx.Observable.create((o) => {
  let count = 0;
  const emit = () => {
    const timeout = Math.random() * 1000;
    setTimeout(() => {
      o.next(count++);
      if (count < 20) {
      	emit();
      } else {
        o.complete();
      }
    }, timeout);
  };
  emit();
}).share();

const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"&gt;&lt;/script&gt;

请注意,去抖动没有上限,因为如果它在去抖动时间内继续接收值,它就不会发出。实际上,这在您的场景中应该不会产生影响,但在其他场景中理论上可以。

【讨论】:

    【解决方案2】:

    正如 bygrace 所说,您正在寻找的是debounce + buffer

    在现代 RXJS 6 和 ES6 Typescript 中添加类型推断,我创建了一个自定义 OperatorFunction 来非常简单地执行此操作,称为 bufferDebounce

    type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
    
    const bufferDebounce: BufferDebounce = debounce => source =>
      new Observable(observer => 
        source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
          next(x) {
            observer.next(x);
          },
          error(err) {
            observer.error(err);
          },
          complete() {
            observer.complete();
          },
      })
    // [as many sources until no emit during 500ms]
    source.pipe(bufferDebounce(500)).subscribe(console.log);
    

    你可以在这里看到一个工作示例https://stackblitz.com/edit/rxjs6-buffer-debounce

    希望这对您和任何新人都有帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-09-12
      • 1970-01-01
      • 1970-01-01
      • 2018-12-06
      • 1970-01-01
      • 1970-01-01
      • 2012-09-17
      • 2022-11-11
      相关资源
      最近更新 更多