【问题标题】:RxJs merging streamsRxJs 合并流
【发布时间】:2017-03-16 17:11:44
【问题描述】:

我期待合并流并刷新所有订阅者。

出于函数式编程的目的,concat 不会更新当前的 observable,我不知道什么是正确的方法。

这就是我希望事情发生的方式:

var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
    function(x) {
        console.log('Next: ' + x);
    },
    function(err) {
        console.log('Error: ' + err);
    },
    function() {
        console.log('Completed');
    });
observable.concat(Rx.Observable.from(['stream2']));

此代码仅读取第一个流然后完成。 当 concat 时,它会创建一个我并不真正想要的新 Observable,因为我已经订阅了第一个。

什么是正确的方法,因为我什至无法进入第一个可观察对象?

谢谢!

【问题讨论】:

    标签: rxjs rxjs5


    【解决方案1】:

    您尝试做的事情并不完全可能,但您基本上有两个选择,都涉及使用Subject

    1) 手动发送数据

    const obs$ = Rx.Observable.of("stream1");
    const subj$ = new Rx.Subject();
    
    Rx.Observable.merge(obj$, subj$)
        .subscribe(
            x => console.log('Next: ' + x),
            x => console.log('Error: ' + x),
            () => console.log('Complete')
        );
    
    subj$.next("stream2");
    subj$.next("stream3");
    

    但是:在这种情况下,永远不会调用 complete,因为 Subject 永远不会自行完成 - 因此,如果您需要触发 complete-handler,您将拥有在末尾添加手册subj$.complete();


    2) 通过主题进行组播

    const obs$ = Rx.Observable.of("stream1");
    const subj$ = new Rx.Subject();
    
    subj$.subscribe(
        x => console.log('Next: ' + x),
        x => console.log('Error: ' + x),
        () => console.log('Complete')
    );
    
    obs$.subscribe(x => subj$.next(x));
    const obs2$ = Rx.Observable.of("stream2");
    obs2$.subscribe(x => subj$.next(x));
    

    在这种情况下,Subject 基本上将充当一个“代理”,它只会传播数据,而不会传播错误或完整触发器。

    这两种解决方案都不是真正的“好” - 但也许您可以更好地概述您的用例,我确信有一个合适的解决方案,不涉及任何复杂的解决方法。


    如果您只想有一种方法可以从永久 Observable 中持续提供数据,您应该使用 BehaviorSubject - 它的工作方式是,您可以在其上发出数据并同时订阅它:

    class Service {
        public data$ = new BehaviorSubject(someInitialDataOrNull);
    
        public getData() {
            makeSomeHttpCall()
                .subscribe(data => data$.next(data));
        }
    }
    
    class Component {
        constructor() {
            theService.data$.subscribe(data => console.log(data));
        }
    }
    

    这是BehaviorSubject 的旧文档的链接(它基本上仍然以相同的方式工作,期望onNext 现在是next,等等...)

    【讨论】:

    • 非常感谢您的回答。我的想法是服务将提供来自 api 或其他来源的可观察数据。我的其他组件正在订阅它,因此它需要从一开始就成为可观察的,并且当数据可用时,它会提供给其他组件。为此目的的正确模式是什么?
    • 非常感谢。你成功了!
    猜你喜欢
    • 2020-09-01
    • 2022-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-11
    • 1970-01-01
    • 1970-01-01
    • 2019-10-05
    相关资源
    最近更新 更多