【问题标题】:Hot and shared Observable from an EventEmitter来自 EventEmitter 的热门和共享 Observable
【发布时间】:2016-03-05 05:58:13
【问题描述】:

有没有办法从 EventEmitter (或 Angular 2 alpha 46 / RxJS 5 alpha 中提供的等效项)获得一个热可观察对象?即如果我们在解析值后订阅,它会触发之前解析的值。类似于我们总是返回相同的承诺。

理想情况下,仅使用 Angular 2 对象(我在某处读到稍后会嵌入轻量级 RxJS 以消除依赖关系),否则导入 RxJS 就可以了。 AsyncSubject 似乎符合我的需要,但它在 RxJS 5 alpha 中不可用。

我尝试了以下操作,但没有成功(从不触发)。知道如何使用它吗?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

用例:仅访问某些异步对象一次(例如,一系列 HTTP 调用合并/包装在 new EventEmitter 中),但提供已解析的异步对象订阅它的任何服务/组件,即使它们在解决后订阅(收到 HTTP 响应)。

编辑:问题不在于如何合并 HTTP 响应,而是如何从 EventEmitter 或任何可用于 Angular 2 alpha 46 / RxJS 5 alpha 的等价物获得(热?)可观察的,允许在检索/解析异步结果(HTTP 只是异步源的一个示例)。 myEventEmitter.share() 不起作用(参见上面的 plunker),尽管它适用于 HTTP 返回的 Observable(参见 plunker from@Eric Martinez)。从 Angular 2 alpha 46 开始,.toRx() 方法不再存在,EventEmitter 是可观察对象和主体本身。

只要我们总是返回相同的 Promise 对象,这与 Promise 配合得很好。由于我们在 HTTP Angular 2 服务中引入了观察者,因此我想避免混淆承诺和观察者(据说观察者比承诺更强大,所以它应该允许用承诺做容易的事情)。

Specs about share()(我还没有找到版本 5 alpha 的文档 - Angular 2 使用的版本) - 处理 Angular 2 HTTP 服务返回的 Observable,而不是 EventEmitter。

编辑:澄清了为什么不使用 HTTP 返回的 Observable,并补充说不直接使用 RxJS 会更好。

编辑:更改描述:关注的是多个订阅,而不是合并 HTTP 结果。

谢谢!

【问题讨论】:

  • 我在plnkr 中有一个使用共享的 http 示例。当您的用例使用 Http 模块时,为什么要询问将 EventEmitter 转换为某物?
  • 我的描述有误导性,用例稍微复杂一些:一系列 HTTP 调用,一旦完成,我将结果合并/包装到一个新的 Observable 中。等同于 promises(angular 1 语法):return $q.all(httpPromisesToResolve);
  • 我认为您不需要share()。你需要的是flatMap。检查这个example
  • 其实我更关心的是如何拥有多个订阅者,并且只触发一次 HTTP 请求/触发异步进程,不管对结果做了什么转换,虽然很高兴看到什么是flatMap 可能(我学会了,谢谢:))。 @user3743222 建议的 AsyncSubject 似乎是解决方案,现在我正在检查如何在带有 RxJS lib v.5 alpha 的 Angular 2 中使用它。
  • share() 在 HTTP 返回的 Observer 上运行良好(我刚刚检查过,它正在工作,转换为 hot observable),但它在 EventEmitter 上不起作用。

标签: javascript angular typescript rxjs


【解决方案1】:

您似乎描述的功能不是冷可观察的功能,而是Rx.BehaviourSubject 的功能。在这里查看关于 Rxjs 主题的解释:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md

我从那里引用:

BehaviourSubject 类似于 ReplaySubject,只是它只存储它发布的最后一个值。 BehaviourSubject 在初始化时也需要一个默认值。当主体还没有收到其他值时,这个值被发送给观察者。这意味着所有订阅者将在订阅时立即收到一个值,除非主题已经完成。

Rx.AsyncSubject 将是最接近承诺的行为:

AsyncSubject 类似于 Replay 和 Behavior 主题,但是它只会存储最后一个值,并且只有在序列完成时才发布它。当源 observable 很热并且可能在任何观察者订阅它之前完成时,您可以使用 AsyncSubject 类型。在这种情况下,AsyncSubject 仍然可以提供最后一个值并将其发布给任何未来的订阅者。

另外两个cmets:

  • 在你的插件中:this._coldObservable = emitter.share();。使用 share 返回一个 hot observable!
  • EventEmitter 实际上首先扩展了主题

更新: 将 EventEmitter 包裹在 Rx.Observable 周围:

function toRx ( eventEmitter ) {
  return Rx.Observable.create(function ( observer ) {
    eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
    // Ideally you also manage error and completion, if that makes sense with Angular2
    return function () {
      /* manage end of subscription here */
    };
  };
)
}

一旦你拥有Rx.Observable,你就可以申请share()shareReplay(1),任何你想要的。

我敢打赌,Angular 团队迟早会提出一个桥接功能,但如果你不想等待,你可以自己做。

【讨论】:

  • 感谢您的回答!看来AsyncSubject 是我正在寻找的东西,尽管Angular lib 提供的对象会更好。但我无法在 Angular 2 使用的 RxJS 库中找到它(版本 5.0.0-alpha.7 - 我修复了问题中的版本号)。有什么想法吗? import {AsyncSubject} from '@reactivex/rxjs/dist/cjs/Rx'; 之类的东西似乎 BehaviorSubject 和 ReplaySubject 可用,但它们都没有完全按预期工作。 PS:好像我把hotcold这两个字倒过来了,hot是我想的。
  • 我真的不明白你想要什么。假设你的 http 调用返回一个 observable,你可以用 Rx.Observable.forkJoin ([httpCalls]) 模拟 $q.all(httpPromisesToResolve)。参照。 github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…reactivex.io/documentation/operators/zip.html(向下滚动到 js 部分和 forkJoin 文档)。然后使用Rx.Observable.forkJoin ([httpCalls]).share(),您可以订阅所有您想要的share obs。,Rx.Observable.forkJoin ([httpCalls]) 将只有一个订阅
  • 现在如果你想获得最后一个值,即使你在 observable 完成后订阅,请改用Rx.Observable.forkJoin ([httpCalls]).shareReplay(1)。参照。 github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…。我不知道这在5.0.0-alpha.7 版本中是否同样有效。
  • 另外,如果你想将 observable 隐藏在你的事件发射器后面,用于forkJoin,你可以使用yourEventEmitter.toRx()
  • 我改写了我最初的问题以澄清。我担心的是即使订阅了多次,也只触发一次异步进程,类似于即使在它被解决后也总是返回相同的承诺。 HTTP 只是异步进程的一个示例。 share() 和 shareReplay(1) 正在处理 HTTP observable,而不是 EventEmitter,如 plunker 所示。无论如何,感谢您建议 forkJoin!这肯定有助于改进代码,我不知道 :)
【解决方案2】:

ReplaySubject 正在做我正在寻找的事情。 @robwormald 提供了一个关于 gitter 的工作示例,我稍作修改以便更好地演示。

暴露 HTTP 响应:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'

@Injectable()
export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);

    http.get('api/people.json')
      .map(res => res.json())
      .subscribe(this.people);
  }
}

多次订阅:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {

    people.subscribe(v => {
      console.log(v)
    });

    //some time later

    setTimeout(() => {
      people.subscribe(v => {
        console.log(v)
      });
      people.subscribe(v => {
        console.log(v)
      });
    },2000)
  }
}

Full plunker

编辑:BehaviorSubject 是另一种选择。在这个用例中,区别在于初始值,例如,如果我们想在使用 HTTP 响应更新之前显示缓存中的内容。

【讨论】:

  • 所以我们最终还是回到了起点。很高兴你找到了答案!作为 SO 的新成员的要点: 1. 尽可能地将您的需求与(想象的)解决方案分开(您坚持使用 event emitter 并最终没有使用 - 这实际上经常发生)并展示您当前的(失败) 方法,2. 在概念和措辞中尽可能准确。示例:问题的第一句:if we subscribe after the value is resolved, it triggers with the previously resolved value. - 可以通过 promises 理解,但对于 observables 则不再适用。
  • 但无论如何,您总是需要一个聊天会话来正确表达您的问题。因此,对于所有将要查看问题和答案的人来说,只需提及前面的要点。
  • 看来我没有用正确的词来表达我正在寻找的行为(我不知道如何用 Observable 词汇表达resolve),感谢您的建议!我会尝试应用它。
猜你喜欢
  • 2023-03-05
  • 2017-07-15
  • 1970-01-01
  • 2017-01-04
  • 1970-01-01
  • 1970-01-01
  • 2017-06-20
  • 2012-09-11
  • 2018-01-27
相关资源
最近更新 更多