【问题标题】:How to use DisposableObserver with lambda expressions in RxJava2如何在 RxJava2 中使用带有 lambda 表达式的 DisposableObserver
【发布时间】:2017-04-18 11:18:00
【问题描述】:

我的用例是想在我的 onNext 中的某个条件后处理。所以尝试使用 DisposableObserver。这是有效的代码

Observable.just(1, 2, 3, 4)
    .subscribe(new DisposableObserver<Integer>() {
                     @Override
                     public void onNext(Integer integer) {
                       System.out.println("onNext() received: " + integer);
                       if (integer == 2) {
                         dispose();
                       }
                     }
                     @Override
                     public void onError(Throwable e) { System.out.println("onError()"); }
                     @Override
                     public void onComplete() { System.out.println("onComplete()"); }
                   }
    );

现在,如果您尝试用 lambda 替换它,它会将 lambda 视为

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

现在就这样做。通过从 onSubscribe 保存一次性然后调用disposable.dispose();从 onNext.

  private Disposable disposable;
  private void disposableObserverTest() {
    Observable.just(1, 2, 3, 4)
        .subscribe(integer -> {
              System.out.println("onNext() received: " + integer);
              if (integer == 2) {
                disposable.dispose();
              }

            }, throwable -> System.out.println("error"),
            () -> System.out.println("complete"),
            disposable1 -> {
              this.disposable = disposable1;
            });
  }

但是,如果你想直接调用 dispose() 怎么用 lambdas 呢?

【问题讨论】:

  • “这给出了一个错误”——什么类型的?
  • 错误是找不到 dispose() 因为它将 lambda 视为 Consumer super T> onNext, Consumer onError,操作 onComplete。但我希望 lambda 成为 DisposableObserver。

标签: android lambda rx-java rx-android rx-java2


【解决方案1】:

你可以使用 takeUntil 来关闭 observable。

@Test
public void takeUntil() throws Exception {
    Observable.just(1, 2, 3, 4)
            .takeUntil(integer -> integer == 2)
            .test()
            .assertValues(1, 2);
}

【讨论】:

    【解决方案2】:

    这是因为在第一种情况下你调用了

    subscribe(DisposableObserver observer)
    

    而在第二种情况下,您调用

    subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete)
    

    这意味着在第二种情况下,您不持有对 DisposableObserver 的引用,因此您不能在其上调用 dispose()

    【讨论】:

    • 正是问题所在。所以我想在第二种情况下使用带有 lambdas 的 DisposableObserver。
    猜你喜欢
    • 1970-01-01
    • 2013-07-02
    • 1970-01-01
    • 2021-10-11
    • 1970-01-01
    • 1970-01-01
    • 2021-01-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多