【问题标题】:Combine rxjs streams, detect changes and return one value合并 rxjs 流,检测变化并返回一个值
【发布时间】:2020-04-07 09:36:27
【问题描述】:

背景 - Angular + .net Core WebApi,假设我正在构建一个时间管理应用程序来跟踪你在任务上花费了多少时间 -> 用户在前端创建一个任务并启动它 -> 有一个计时器显示经过的时间。

这个想法 - 经过的时间来自后端。当用户通过服务在前端启动任务时,我在后端启动一个计时器并将经过的时间传递回前端,以便我可以将其展示给用户。即使后端和前端之间存在连接问题,我也想显示正确的值。

图表:后端流每秒发出一次值(我向用户显示的经过时间),但存在连接故障,6 秒后它冻结了片刻,然后发送 9(“0:09”),这可能会造成混淆用户(“之前是 6 秒,现在是 9 秒??”)。所以我在每秒发出新值的前端创建和间隔。每秒钟我都想检查后端流是否发送了新值,如果没有,我想获取以前的值并对其进行修改,以便用户获得正确的值。

bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9

What user sees:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (freeze) -> 00:09

What I want to user to see:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (freeze - frontend knows there is no new value so adds 1 second to previous)
So, it should look like that:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09

我正在努力从哪里开始。

快速调整了两个流,但不知道如何找到它 bStream 没有发出新值。

https://stackblitz.com/edit/typescript-yookbj

【问题讨论】:

  • 为什么不在前端花费时间?

标签: javascript typescript .net-core rxjs ngrx


【解决方案1】:

这是一种方法:

const be$ = concat(
  of(1).pipe(delay(100)),
  of(2).pipe(delay(100)),
  of(3).pipe(delay(100)),
  of(4).pipe(delay(100)),
  of(5).pipe(delay(100)),
  of(6).pipe(delay(100)),

  of(10).pipe(delay(500)), // After freeze
  of(11).pipe(delay(100)),
  of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));

// `skip(1)` - the `ReplaySubject` used be `shareReplay()` will give us the latest value
// and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));

const fe$ = be$.pipe(
  mergeMap(v => merge(
    of(v),
    of(v).pipe(
      expand(v => timer(100).pipe(map(v1 => 1 + v))),
      takeUntil(beReady$),
    )
  )),
  distinctUntilChanged(),
  filter(v => v !== null)
).subscribe(console.log)

endWith(null) - 为了在发出最后一个值 (12) 时停止递归,我们需要源发出其他内容

shareReplay - 除了主订阅者(fe$)

之外,还有另一个订阅者(beReady$) 需要共享源
mergeMap(v => merge(
  of(v), // Pass along the current value
  of(v).pipe(
    // If the `be$` does not emit in the next 100ms, send the `currentValue + 1`
    // and keep doing the same until the `be$` finally emits
    expand(v => timer(100).pipe(map(v1 => 1 + v))),
    takeUntil(beReady$),
  )
)),

expand 就像使用mergeMap,但是:

  • 它将沿内部值传递
  • 它将根据最后一个内部值创建另一个内部可观察对象;所以,它是递归的
  • takeUntil(beReady$) 是停止递归的方法

StackBlitz

【讨论】:

  • 应该这么复杂吗? switchMap 和一个小的延迟做同样的事情要容易得多。
  • 我认为这是 expand 的一个很好的用例。但是,我同意,switchMap 会简化事情。
  • 作为一个用例,它是:) 一个不错的运算符。抱歉跑题了。
  • 别担心!你是对的,出于某种原因,我没有想到“swtichMap”
【解决方案2】:

请检查下面的代码。要模拟同步问题,请将代码v * 5 更改为v * 4,然后计数器一旦获得它就会尊重来自“后端”的值。

// emit every 5s
const source = interval(5000).pipe(
    map(v1 => v1 + 1), // <- only for the example. starting counting from 1.
    startWith(0), // <- only for the example. instant emit of 0.

    map(v => v * 5), // <- every 5 seconds we get right passed amount. so it emits 0 5 10 15.
);
// emit every 1s
const secondSource = interval(1000).pipe(
    delay(50), // <- we want it to be a bit later than original counter.
    map(v1 => v1 + 1), // emits are 1 2 3, not from 0.
    startWith(0), // instant emit of 0.
);

source.pipe( // <- it is our slow backend (it gives us an update every 5 seconds.
    switchMap(v => secondSource.pipe( // if no emit from parent stream - then in 1.05 we get value from this one.
        map(v1 => v + v1), // - adding offset from the second stream to the parent stream.
    )),
).subscribe(v => console.log(v));

现在即使后端有滞后,它也可以从 0 平稳地计数到 N。

更新

还有更简单的方法,但问题是它不依赖后端响应的时间并且有自己的 1 秒周期。

pipe(
  bufferTime(1000), // <- collects for a second.
  scan((a, b) => b.length ? a + 1 : b[0], 0), // assumes or returns.
);

【讨论】:

    猜你喜欢
    • 2019-10-05
    • 1970-01-01
    • 2017-03-05
    • 1970-01-01
    • 2021-03-26
    • 2012-04-19
    • 1970-01-01
    • 2017-05-15
    • 1970-01-01
    相关资源
    最近更新 更多