【问题标题】:SwitchMap and PublishReplay in combinationSwitchMap 和 PublishReplay 组合
【发布时间】:2018-01-26 08:12:21
【问题描述】:

我正在处理一个 Angular 项目,但我不完全理解为什么这段代码 sn-p 没有按预期工作:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call")) //only triggered once
  .publishReplay(1).refCount();

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs);

const c1 = observable.subscribe(() => console.log("subscriber 1"));
const c2 = observable.subscribe(() => console.log("subscriber 2"));

JS Bin

我不明白为什么 Rx.Observable.timer 发出的每个事件都不会触发“触发 http 调用”。我的期望是,只要一个订阅者订阅了 observable,每个发出的事件都会触发 http 调用。但是,每个订阅者都会收到相同的排放,这意味着每个事件只有一个 http 调用。

我知道我可以通过在 observable 上使用 publishReplay + refcount 来修复它(见下文),但这仅适用于这个简单的示例。

我真正尝试实现的是两个组件使用相同的 http 调用响应,并且每个组件都会发出一个 onInit 事件,然后将其 switchMapped 到 http-response。我不希望每个新订户都触发呼叫。但是,我希望在发出另一个事件时触发调用。 onInit observable 与 click-observable 合并,只要用户点击“更新”按钮,它就会在 switchMap 操作符用于转换之前发出新事件。现在的问题是,更新按钮不再触发 http 调用。

这个例子展示了它应该如何表现:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call"));

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs)
  .publishReplay(1);

observable.subscribe(() => console.log("subscriber 1"));
observable.subscribe(() => console.log("subscriber 2"))

observable.connect();

JS Bin

【问题讨论】:

    标签: javascript rxjs reactive-programming


    【解决方案1】:

    您遇到的问题是所有 Subject 实例都有其内部状态,当它们收到 completeerror 通知时,它们会将自己标记为“已停止”并且永远不会发出任何东西(这是所谓的可观察合同)。你可以看看我前段时间写的这篇文章https://medium.com/@martin.sikora/rxjs-subjects-and-their-internal-state-7cfdee905156。它涵盖了非常相似的主题。

    在您的代码中,您多次使用相同的 httpObs,但源 Observable 是 of(),它发出一个值,然后发送 complete 通知。

    看到它真的叫:http://jsbin.com/fuvuxax/2/edit?js,console

    const httpObs = Rx.Observable.of("")
      .do(() => console.log("trigger call"), undefined, () => console.log('complete'))
      .publishReplay(1).refCount();
    

    这意味着publishReplay 中的 Subject 收到了 complete 通知,并且不会再次订阅其源 Observable。但是,由于publishReplay 在内部使用ReplaySubject,它仍然会刷新其缓冲区,然后发送complete 通知。但这对您来说不是很有用,因为我认为您想再次执行 HTTP 调用,而不仅仅是重播过时的调用。

    您可以做的是将httpObs 转换为每次switchMap 收到值时创建链的方法:

    const httpObs = () => Rx.Observable.of("")
      .do(() => console.log("trigger call"), undefined, () => console.log('complete'));
    
    let observable = Rx.Observable.timer(0,1000)
      .do((e) => console.log("new event: " + e))
      .take(3)
      .switchMap(() => this.httpObs())
      .publishReplay(1).refCount();
    
    observable.subscribe(() => console.log("subscriber 1"));
    observable.subscribe(() => console.log("subscriber 2"));
    

    您可以看到,"trigger call" 只打印了 3 次,如果我正确理解您的描述,我认为这就是您想要的:

    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    

    【讨论】:

    • 我需要一些时间来理解它的含义,但非常感谢您的帮助和非常详细的回答!
    • 顺便说一句,这个例子有一个小错误: // jsbin.com/hotidih/5/edit?js,console - 最后一行应该是 replay.subscribe(...
    • 我刚刚注意到您的示例并没有解决问题,因为它与我的第二个 sn-p 基本相似。我试图实现的是一个共享的 httpObs observable,它注意新订阅(例如来自其他组件)不会因为新订阅而触发调用,但只有在发出新事件时才会触发。
    • @Daniel 我不知道你想做什么,你需要告诉我你期望的输出。
    • 与您创建的输出相同,但方式不同。图像 httpObs 在服务中,“可观察”是一个组件,并且有多个组件在观察 httpObs。我试图在这个例子中总结一下:jsbin.com/bikubujufu/edit?js,console
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-12
    • 2019-02-27
    • 2018-06-04
    • 2023-03-25
    • 2022-08-03
    相关资源
    最近更新 更多