【问题标题】:How to set a timeout on a RxJS Observable with a cache throught `.publishReplay()`?如何通过`.publishReplay()`在带有缓存的RxJS Observable上设置超时?
【发布时间】:2017-01-23 19:12:33
【问题描述】:

我创建了一个 Observable 来缓存他们的结果一段时间。这个例子很棒而且非常有用!但我无法为项目生产者设置超时。我试图在mockDataFetch() 中使用超时运算符,但在第一个失败的项目之后,流无法恢复。 如何实现mockDataFetch 超时?

这正是我所做的:

const Observable = Rx.Observable;

var counter = 1;
var updateRequest = Observable.defer(() => mockDataFetch())
    .publishReplay(1, 1000)
    .refCount();

function mockDataFetch() {
    return Observable.of(counter++)
        .delay(Math.floor((Math.random() * 100) + 1))
        .timeout(50);
}

function mockHttpCache() {
    return updateRequest
        .take(1);
}

另一方面,如果在mockDataFetch 中获得异常会发生什么?我希望在下一个项目上(1000 毫秒后,正如它在 publishReplay 方法中定义的那样),observable 会发出一个新项目。

【问题讨论】:

    标签: javascript rxjs rxjs5


    【解决方案1】:

    我认为我应该更新示例并添加这个用例,因为这是一种很常见的情况(无论如何,我很高兴你发现它很有用!)。

    当从mockDataFetch() 返回的 Observable 发送错误/完成通知时,内部的 Subject 将自己标记为已停止(参见解释 Rx.Subject loses events),因此它不会重新发送任何项目。理想情况下,您可以使用mockDataFetch() 中的catch() 运算符捕获所有错误:

    function mockDataFetch() {
        return Observable.of(counter++)
            .delay(Math.floor((Math.random() * 100) + 1))
            .timeout(50)
            .catch(err => Observable.of('This request is broken.'));
    }
    

    观看现场演示:https://jsbin.com/jiguti/5/edit?js,console

    由此产生的输出可能如下所示:

    Response 0: This request is broken.
    Response 50: This request is broken.
    Response 200: This request is broken.
    Response 1200: 2
    Response 1500: 2
    Response 3500: This request is broken.
    

    【讨论】:

    • 优秀的答案。谢谢!!我听从了你的提议。我希望找到一种方法来接收错误,同时让未来的观察者可以使用 observable。我知道根据 ReactiveX 规范,在发生错误或完整通知后不可能接收到新项目,但也许有一个包装其他项目的可观察对象,仍然可以获得类似的效果。也就是说,第一个 observable 截取其下划线 observable 的发射,如果发生错误,它会为将来的订阅构建一个新的 observable。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-22
    • 2017-09-09
    • 1970-01-01
    相关资源
    最近更新 更多