【问题标题】:RxJava: OnNext Unsubscribe is not workingRxJava:OnNext 取消订阅不起作用
【发布时间】:2016-12-28 18:07:26
【问题描述】:

我在收到来自 observable 的第一项后尝试取消订阅。它似乎不起作用。我做错了什么?

    public class ObservableAndSubscriber {

    public static void main(final String[] args) {
        final Observable<String> strObservable = Observable.create(s -> {
            while (true) {
                s.onNext("Hello World!!");
            }
        });

        final Subscriber<String> strSubscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onNext(final String t) {
                System.out.println(t);
                this.unsubscribe();

            }
        };
        strObservable.subscribe(strSubscriber);
    }
}

结果似乎在无限循环中打印“Hello World”。

【问题讨论】:

标签: java jakarta-ee rx-java reactive rx-java2


【解决方案1】:

要取消订阅工作,您需要在循环中检查订阅状态。否则它将无限运行,耗尽您的 CPU。

处理无限序列的最简单方法是利用库提供的方法Observable.generate()Observable.fromIterable()。他们会为您进行适当的检查。

还要确保您订阅并观察不同的线程。否则,您将只为单个订阅者提供服务。

你的例子:

strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);

【讨论】:

    【解决方案2】:

    我在阅读 Tomasz Nurkiewicz 的书时做了这个例子。后来他有一个类似的例子,并解释了我的代码有什么问题。

    无限循环是邪恶的!因为,我在我的 observable create lambda 表达式中有一个,它在我的主线程的上下文中执行并无限期地订阅块。为了让可观察的 create lambda 在不同的线程中执行,这意味着主线程永远不会被阻塞并且 subscribe() 完成。我要求订阅发生在 IO 线程上,结果成功了,这意味着无限循环发生在 IO 线程上并且不会阻塞主线程。

    strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber);
    

    【讨论】:

    • 我认为这并不能解决退订问题。在main() 的末尾添加Thread.sleep(1000) 以延迟程序终止,看看它是如何进行的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-27
    • 1970-01-01
    • 1970-01-01
    • 2016-12-25
    • 2018-07-15
    • 2019-02-04
    相关资源
    最近更新 更多