【问题标题】:RxJS buffer and delay until one other observable emit first timeRxJS 缓冲和延迟,直到另一个 observable 第一次发出
【发布时间】:2020-03-25 17:32:26
【问题描述】:

我很难制作一个复杂的可观察管道,如果有人能在那个管道上帮助我,我将不胜感激......

上下文

我有一个数据流,通过蓝牙给我一些值,这些值是我必须解码的数据帧。

这是一个名为 RX$ 的 BehaviorSubject。

现在在RX$,有时我会收到即时数据 (INST),有时会收到历史数据 (HIST)。使用 INST,除了其他东西外,我还收到发送数据版本和型号的设备。我成功生成了一个 observable,它能够用设备版本和型号计算我一个 JSON 对象,只要它没有两者都不会发出,我们称之为deviceVersionModelStream$

现在在另一边,我在一个我们称之为historyStream$ 的流中批量接收 HIST 数据帧,因为有很多数据,我使用bufferTime(2000) 来制作一个数据数组并依靠我的嵌入式数据库批量插入(而不是一个一个)。

到目前为止,这一直很好......

新用例

现在我的客户添加了一条新规则,他们的旧设备类型无法为我提供特定案例的一些数据,但使用相同的模式我知道它还给了我什么。

因此,在解码帧并插入数据库之前,我需要有设备版本和型号。

我的问题是,我怎么能延迟historyStream$ 的出现,只要deviceVersionModelStream$ 发出一次(它在其他地方也很常用)并且当它发生时,我想生成某种 JSON 对象原始框架和版本/模型。

但 ALSO 也会逐渐分派这些信息,以免像我之前的 bufferTime(2000) 那样压倒我的数据库批量插入?

我正在尝试使用缓冲区、mergeMap、延迟,但我很难实现这个……

也许有 RX 方面的强者可以帮助我?

非常感谢

【问题讨论】:

    标签: rxjs observable


    【解决方案1】:

    1 总体思路 - 在缓冲和非缓冲之间来回切换

    看起来您需要在开始时暂停(缓冲)historyStream$,然后在 deviceVersionModelStream$ 发出时取消暂停。一种暂停/取消暂停流的方法是。

    merge(
      source$.pipe(bufferToggle(pauseOn$, () => pauseOff$)),
      source$.pipe(windowToggle(pauseOff$, () => pauseOn$))
    ).pipe(mergeAll())
    

    另见:https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd

    您的具体情况

    对于您的情况,这将是:

    const versionModel$ = deviceVersionModelStream$.pipe(take(1));
    merge(
      historyStream$.pipe(bufferToggle(of(0), v => versionModel$)),
      historyStream$.pipe(windowToggle(versionModel$, v => NEVER))
    ).pipe(
      mergeAll(),
      bufferTime(2000)
    );
    

    如果您想在每次发出时都可以访问deviceVersionModelStream$ 的输出,您可以使用combineLatest。如果对historyStream$deviceVersionModelStream$ 的多个订阅有问题,您可以事先使用share

    const versionModel$ = deviceVersionModelStream$.pipe(share(), take(1));
    const historyStreamShared$ = historyStream$.pipe(share());
    combineLatest(
      versionModel$,
      merge(
        historyStreamShared$.pipe(bufferToggle(of(0), v => versionModel$)),
        historyStreamShared$.pipe(windowToggle(versionModel$, v => NEVER))
      ).pipe(
        mergeAll(),
        bufferTime(2000)
      )
    ).subscribe(console.log);
    

    https://stackblitz.com/edit/rxjs-oyctsg

    编辑(如果您在开始时只需要一次缓冲区)

    在您的情况下,一旦关闭缓冲区,您就不需要再次使用缓冲区,即一旦您从 bufferToggle 切换到 windowToggle 流,您就不需要切换回 bufferToggle 流。

    这允许一种稍微简单的方法,只使用buffer 而不是bufferTogglewindowToggle

    2 总体思路 - 仅在开头缓冲

    source$ = dataStream.pipe(share()) // make sure this is a hot observable
    pauseOff$ = timer(5000) // make sure this observable emits once and completes
    
    concat(
      source$.pipe(buffer(pauseOff$), mergeAll()), // start with a buffered stream
      source$ // switch to the unbuffered stream when pauseOff$ emits and completes the previous stream
    )
    

    您的具体情况

    const historyStream$ = timer(0, 100).pipe(share(), take(200));
    const versionModel$ = of("version-model").pipe(delay(5000), take(1));
    
    combineLatest(
      versionModel$,
      concat(
        historyStream$.pipe(buffer(versionModel$), mergeAll()),
        historyStream$
      ).pipe(
        bufferTime(2000)
      )
    ).subscribe(console.log);
    

    https://stackblitz.com/edit/rxjs-2ptrux

    【讨论】:

    • 哇哦,好吧……我试试这个,但我真的不知道如何使用 windowToggle?你有关于暂停/取消暂停流组合的文章吗?
    • bufferToggle 只是缓冲pauseOn$ -> pauseOff$ 之间的值,并在pauseOff$ 上发出这些值。在其他时间到达的值将被忽略。这就是为什么您需要带有windowToggle 的第二个流来发出pauseOff$ -> pauseOn$ 之间的值。
    【解决方案2】:

    这篇文章有点老了,但我最近在寻找“缓冲区 rxjs 可观察对象直到另一个事件发生”时偶然发现了它,我想我会分享一个更新版本的样子。

    我在https://thinkrx.io 上构建了它。这是我使用的代码:

    const { rxObserver } = require('api/v0.3');
    const { timer, of, combineLatest, concat } = require('rxjs');
    const { delay, take, share, buffer, mergeAll, bufferTime, filter, takeUntil, skipUntil } = require('rxjs/operators');
    
    
    
    const historyStream$ = timer(0, 10).pipe(
      share(), take(10)
    );
    
    const versionModel$ = of("A").pipe(
      delay(50), 
      take(1)
    );
    
    historyStream$.subscribe(rxObserver('History Stream'));
    versionModel$.subscribe(rxObserver('Version Model'));
    
    combineLatest([
      versionModel$,
      concat([
        historyStream$.pipe(buffer(versionModel$)),
        historyStream$.pipe(skipUntil(versionModel$))
      ]).pipe(
        mergeAll()
      )]
    ).subscribe(rxObserver("Buffer Window"));
    

    这是输出:

    这里发生的事情是我们将两个 observables 传递给concat:一个代表缓冲的事件集,另一个代表传入的事件流。我们最终将所有这些传递到@987654326 @ 以获得适当的效果。

    【讨论】:

    • 感谢分享,对科学总是好的;)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-30
    • 2018-03-17
    相关资源
    最近更新 更多