【问题标题】:RxJava 2 equivalent to isUnsubscribedRxJava 2 相当于 isUnsubscribed
【发布时间】:2017-06-13 15:34:49
【问题描述】:

我一直在研究Reactive Programming with RxJava 书中的示例,该书针对的是版本 1 而不是 2。无限流的介绍有以下示例(并注意有更好的方法来处理并发):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();

但是,在 RxJava 2 中,传递给 create() 方法的 lambda 表达式是 ObservableEmitter 类型,并且没有 isUnsubscribed() 方法。我查看了What's Different in 2.0 并搜索了存储库,但找不到任何此类方法。

在 2.0 中如何实现相同的功能?

已编辑以包含以下给出的解决方案(n.b. 使用 kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: BigInteger = BigInteger.ZERO
        while (!emitter.isDisposed) {
            emitter.onNext(int)
            int = int.add(BigInteger.ONE)
        }
    }).start()
}

val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }

Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()

【问题讨论】:

    标签: rx-java rx-java2 rx-kotlin


    【解决方案1】:

    订阅 Observable 后,返回 Disposable。您可以将其保存到本地变量并检查 disposable.isDisposed() 以查看它是否仍在订阅。

    【讨论】:

      猜你喜欢
      • 2018-05-27
      • 2018-01-24
      • 1970-01-01
      • 2017-02-08
      • 2011-01-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-30
      相关资源
      最近更新 更多