【问题标题】:Replace callbacks with observables from RxJava用 RxJava 中的 observable 替换回调
【发布时间】:2017-12-05 05:46:34
【问题描述】:

我使用监听器作为回调来观察 Android 的异步操作,但我认为用 RxJava 替换这个监听器可能会很棒,我是新使用这个库,但我真的很喜欢它,而且我总是在 Android 项目中使用它。

这是我要重构的代码:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}

一个简单的回调:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}

还有“观察者”:

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});

谢谢!

【问题讨论】:

标签: java rx-java


【解决方案1】:

例如,您可以使用 Observable.fromCallable 来使用您的数据创建 observable。

public Observable<Data> getData(){
    return Observable.fromCallable(() -> {
        Data result = null;
        //do something, get your Data object
        return result;
    });
}

然后使用您的数据

 getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

使用 rxjava 1.x 和 lambda 表达式。

编辑:

如果我理解你的话,你想替换那个监听器,而不是把它包装成 observable。我参考您的评论添加了其他示例。哦.. 如果您只期待一个项目,您也应该使用 Single

public Single<Data> getData() {
        return Single.create(singleSubscriber -> {
            Data result = object.getData();
            if(result == null){
                singleSubscriber.onError(new Exception("no data"));
            } else {
                singleSubscriber.onSuccess(result);
            }
        });
    }

getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

【讨论】:

  • 谢谢。我要试试你的例子
  • 可以说:嘿,我准备好了,就像 listener.onSuccess() 一样?例如在没有返回的方法中,只需触发一个事件?
  • 我知道这有点离题,但很难弄清楚或找到以下内容:如何从标准 Java observable(Java 10 中已弃用)docs.oracle.com/javase/7/docs/api/java/util/Observable.html 更改为 RxJava Observables?包装或更换的任何提示或指针?谢谢
【解决方案2】:

您正在寻找Completable.create:

Completable:表示没有任何值但仅指示完成或异常的延迟计算。该类遵循与 Reactive-Streams 类似的事件模式:onSubscribe (onError|onComplete)?

Completable.create(subscriber -> {
    object.getData(new OnResponseListener() {
        @Override
        public void onSuccess() {
           subscriber.onCompleted();
        }

        @Override
        public void onError() {
           subscriber.onError(* put appropriate Throwable here *);
        }
    }
})
...//apply Schedulers
.subscribe((() -> *success*), (throwable -> *error*));

【讨论】:

    【解决方案3】:

    我将如何重构您的代码;除了 getData 方法,我将添加包装为 Single 的 getData 方法:

    public void getData( final OnResponseListener listener ){
        if(data!=null && !data.isEmpty()){
            listener.onSuccess();
        }
        else{
            listener.onError();
        }
    }
    
    public Single<Boolean> getDataSingle() {
        return Single.create(new SingleOnSubscribe<Boolean>() {
            @Override
            public void subscribe(SingleEmitter<Boolean> e) throws Exception {
                getData(new OnResponseListener() {
                    @Override
                    public void onSuccess() {
                        e.onSuccess(true);
                    }
    
                    @Override
                    public void onError() {
                        e.onSuccess(false);
                    }
                });
            }
        });
    }
    

    或者使用 Java 8:

    public Single<Boolean> getDataSingle() {
        return Single.create(e -> getData(
                new OnResponseListener() {
                    @Override
                    public void onSuccess() {
                        e.onSuccess(true);
                    }
    
                    @Override
                    public void onError() {
                        e.onSuccess(false);
                    }
                })
        );
    }
    

    现在您已经在回调的 API 旁边公开了一个 Rx API。假设它是您自己的某种 DataProvider,您现在可以在不处理回调的情况下使用它,如下所示:

    dataProvider.getDataSingle()
            .map(result -> result ? "User exist" : "User doesn't exist")
            .subscribe(message -> display(message));
    

    我使用了 Rx2,但使用 Rx1 的逻辑是一样的。

    我还使用了Single 而不是 Observable,因为您只等待一个值。兴趣是您的功能更具表现力的合同。

    你不能代表 Observable 发出值,即调用类似 myObservable.send(value) 的东西。第一个解决方案是使用Subject。另一种解决方案(上面的那个)是使用 Observable.create()(或 Single.create())创建 observable。您调用回调方法并在方法 Observable.create() 中创建侦听器,因为在 Observable.create() 中您可以调用 onSuccess() 方法,该方法告诉 Observable 传递一个值。

    这是我用来将回调包装成 observable 的。一开始有点复杂,但很容易适应。

    我给你另一个例子,正如你所问的那样。假设您要将 EditText 的更改显示为 Snackbar:

    View rootView;
    EditText editTextView;
    
    //Wrap Android addTextChangedListener into an Observable
    Observable<String> textObservable = Observable.create(consumer ->
            editTextView.addTextChangedListener(new TextWatcher() {
                @Override
                public void beforeTextChanged(CharSequence s, int start, int count, int after) {
    
                }
    
                @Override
                public void onTextChanged(CharSequence s, int start, int before, int count) {
    
                }
    
                @Override
                public void afterTextChanged(Editable s) {
                    consumer.onNext(s.toString());
                }
            })
    );
    
    //Use it
    textObservable.subscribe(text -> Snackbar.make(rootView, text, Snackbar.LENGTH_SHORT).show());
    

    【讨论】:

    • 你能用 lambdas 发布一个完整的例子吗?尝试这段代码我有点迷失了
    • 添加了两个例子,一个是你的代码,一个是TextView的text属性的变化。
    • 在处理订阅导致内存泄漏时,侦听器未取消注册。
    • 你是对的,而且比这更糟糕;如果 getData() 在处理订阅后产生一个新值,它将崩溃。我应该在 getDataSingle() 中添加一个 doOnSubscribe 来删除监听器。在第二个示例中(通常)不存在该问题,因为发射器和接收器都属于同一个 Activity(或 Fragment)并且将一起进行 GC。
    【解决方案4】:
    Maybe.<String>create(new MaybeOnSubscribe<String>() {
          @Override
          public void subscribe(MaybeEmitter<String> e) throws Exception {
            OnSuccessListener(uri->{
              e.onSuccess(uri));
            })
            .addOnFailureListener(throwable -> {
              e.onError(throwable);
            });
          }
        });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-09-19
      • 1970-01-01
      • 1970-01-01
      • 2018-12-10
      • 1970-01-01
      • 2016-08-14
      • 2018-08-23
      • 2020-11-11
      相关资源
      最近更新 更多