【问题标题】:RxJs Observable completes multiple timesRxJs Observable 多次完成
【发布时间】:2017-06-14 17:04:26
【问题描述】:

下面是响应式代码的短代码sn-p (RxJs)

let subj = new Rx.Subject();
let chain = subj
    .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
    .share()
    .take(1);


function subscribe(){
  chain.subscribe(v => console.log("Next", v),
                  err => console.log("Error",err),
                  () => console.log("Completed"));
  chain.subscribe(v => console.log("Next2", v),
                  err => console.log("Error2",err),
                  () => console.log("Completed2"));
  subj.next(Math.random());
}

subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

根据documentationchain 是一个Observable,它应该打印发出的值 * 10 (switchMap),同时只打印一次,不管它有多少订阅 (share ),仅对第一个发出的值执行此操作,然后完成。

前两个项目符号工作正常,但最后一个项目符号不行。 这是我得到的输出:

Switch map 9.022491050934722
Next 9.022491050934722
Completed
Next2 9.022491050934722
Completed2
Switch map 9.172999425126836
Next 9.172999425126836
Completed
Next2 9.172999425126836
Completed2
Switch map 6.168790337405257
Next 6.168790337405257
Completed
Next2 6.168790337405257
Completed2

如您所见,chain 正在多次完成。
是什么让您可以多次完成相同的Observable

【问题讨论】:

    标签: javascript rxjs observable rxjs5 reactivex


    【解决方案1】:

    sharepublishrefCount组合的快捷方式,这意味着只要至少有1个订阅者,流就只是“热”的,所以流完成后,所有活跃的订阅者会自动取消订阅,这反过来会重置流,因为那时有 0 个订阅者。另外:您应该将take(1) 放在share 之前,因为任何后续操作都会影响热状态。

    如何使流“真正地”共享/热,独立于任何订阅者:使用publishconnect 流:

    let subj = new Rx.Subject();
    let chain = subj
        .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
        .take(1)
        .publish();
    chain.connect();
    
    function subscribe(){
      chain.subscribe(v => console.log("Next", v),
                      err => console.log("Error",err),
                      () => console.log("Completed"));
      chain.subscribe(v => console.log("Next2", v),
                      err => console.log("Error2",err),
                      () => console.log("Completed2"));
      subj.next(Math.random());
    }
    
    subscribe();
    subscribe();
    subscribe();
    <script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

    【讨论】:

    • 所以您是说从share 返回的每个Observable 在完成后都会被重置和重新评估?
    • 不是自动的,但只要有新的订阅者。
    • 这解释了一些事情。希望他们在文档中有它。在share 部分之后我没有得到take。能详细点吗?
    • 它在文档中:“只要有至少一个订阅者,这个 Observable 就会被订阅并发出数据。当所有订阅者都取消订阅时,它将从源 Observable 取消订阅” - 当然是 Rx 的基本原则之一:当您取消订阅时,流停止,当您订阅时,流将从 0 开始。 - 请记住,在您的示例中,您是 not 完成主题,但只有你的子流——如果你完成了Subject,那么它的行为会更像你预期的那样(尽管出于完全不同的原因,而不是因为共享)
    • 这是基本原则之一,您可能会在谷歌上搜索“RxJS 介绍” - 每当流(任何流)完成或抛出错误时,任何订阅者都会自动取消订阅 - 至于订单:@ 987654335@ 返回一个共享流,但是如果您附加一个运算符,您将获得一个新流,该流仅共享到 share-operator 的点 - 与 take(n) 并没有太大区别,不过如果有一个 switchMap 例如在分享之后你会看到很多不想要的东西——所以经验法则应该是把任何分享运营商放在最后。
    猜你喜欢
    • 2019-05-10
    • 2020-03-26
    • 1970-01-01
    • 2021-11-16
    • 2016-03-09
    • 1970-01-01
    • 2021-08-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多