【问题标题】:Using takeUntil to complete an RxJS stream使用 takeUntil 完成一个 RxJS 流
【发布时间】:2017-01-28 22:02:32
【问题描述】:

我想使用takeUntil 运算符完成一个不能自然完成的内部流,如下所示:

  outerObservable
    .mergeMap(() => {
        innerObservable
            .takeUntil(stopObservable$)
    });

这行得通,内部流按预期完成,但我希望外部流在停止信号之后返回最后一个值。即使经过大量的谷歌搜索,我仍然不知道如何。

编辑:

我编写了一个似乎可以解决问题的运算符,但由于我知道有更好的方法或者我完全误解了一些东西,所以让问题悬而未决。

function takeUntilThen(notifier, oneLastValue) {
    return Rx.Observable.create(subscriber => {
        var source = this;
        notifier.subscribe(() => {
            subscriber.next(oneLastValue);
            subscriber.complete()
        });
        return source.subscribe(value => {
            subscriber.next(value);
        },
        err => subscriber.error(err),
        () => subscriber.complete());
    });
}

【问题讨论】:

  • 小心,你的运营商泄露了它的通知订阅!
  • 谢谢,我会调查一下 - 当我弄清楚它的意思时:-)

标签: javascript rxjs system.reactive redux-observable


【解决方案1】:

听起来你想在innerObservablestopObservable 之间比赛,无论哪个获胜,都应该能够输出一些东西

您可以为此使用恰当命名的.race() 运算符,而不是使用.takeUntil()。请参阅关于取消的 redux-observable 文档配方中的示例:

https://redux-observable.js.org/docs/recipes/Cancellation.html#cancel-and-do-something-else-eg-emit-a-different-action

const somethingEpic = action$ =>
  action$.ofType(SOMETHING)
    .mergeMap(action =>
      innerObservable
        .race(
          stopObservable
            .take(1)
        )
    );

由于您的示例是伪代码,因此这里是更具体的示例:

import { ajax } from 'rxjs/observable/dom/ajax';

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(response => fetchUserFulfilled(response))
        .race(
          action$.ofType(FETCH_USER_CANCELLED)
            .map(() => incrementCounter())
            .take(1)
        )
    );

【讨论】:

  • 感谢您的回答。当内部可观察对象永远不会自然完成时,这会起作用吗?我对 redux-observable 文档中提供的解决方案进行了查看和尝试,但它不适用于我的情况。
  • @mysomic 你能解释一下“永远不会自然完成”是什么意思吗?
  • 在那个例子中,ajax 流只返回一个值然后完成。我的流(我在完全相同的地方)在事件发生时返回许多值,并且它永远不会(自然地)完成。因此,至少在我看来,不可能有比赛......至少我认为这就是为什么该示例不适用于我的特定情况:-) 我已经使用 ajax 流使其正常工作示例
  • 当使用.race() 时,获胜者是第一个 发出一些东西。获胜者可以继续发射,但失败者退订。听起来您可能在说您不希望它成为赢家与输家,因为您希望 stopObservable 能够停止 innerObservable ,即使在 innerObservable 发出了许多东西之后,更符合 takeUntil 让您做的事情?
  • 如果是这种情况,我需要知道 innerObservable 和 stopObservable 发出了什么来给出建议。 可能正常使用 takeUntil(stopObservable) 并在另一个史诗中再次收听相同的 stopObservable 或使用 merge()
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-12-22
  • 2019-11-15
  • 2019-01-16
  • 2019-07-22
  • 1970-01-01
  • 2018-07-28
  • 1970-01-01
相关资源
最近更新 更多