您尝试做的事情并不完全可能,但您基本上有两个选择,都涉及使用Subject:
1) 手动发送数据
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
Rx.Observable.merge(obj$, subj$)
.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
subj$.next("stream2");
subj$.next("stream3");
但是:在这种情况下,永远不会调用 complete,因为 Subject 永远不会自行完成 - 因此,如果您需要触发 complete-handler,您将拥有在末尾添加手册subj$.complete();。
2) 通过主题进行组播
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
subj$.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));
在这种情况下,Subject 基本上将充当一个“代理”,它只会传播数据,而不会传播错误或完整触发器。
这两种解决方案都不是真正的“好” - 但也许您可以更好地概述您的用例,我确信有一个合适的解决方案,不涉及任何复杂的解决方法。
如果您只想有一种方法可以从永久 Observable 中持续提供数据,您应该使用 BehaviorSubject - 它的工作方式是,您可以在其上发出数据并同时订阅它:
class Service {
public data$ = new BehaviorSubject(someInitialDataOrNull);
public getData() {
makeSomeHttpCall()
.subscribe(data => data$.next(data));
}
}
class Component {
constructor() {
theService.data$.subscribe(data => console.log(data));
}
}
这是BehaviorSubject 的旧文档的链接(它基本上仍然以相同的方式工作,期望onNext 现在是next,等等...)