【问题标题】:How do I convert async task/ rx java code to rxjava2?如何将 asynctask/rxjava 代码转换为 rxjava2?
【发布时间】:2017-12-12 01:18:04
【问题描述】:

我尝试将下面的AsyncTask 代码转换为Rxjava2,但显然Rxjava2 不处理空值,因此我的应用程序崩溃了。这是我的AsyncTask 代码:

new AsyncTask<Void, Void, Void>() {
            @Override
            protected Void doInBackground(Void... params) {
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                    Participant participant = entry.getValue();
                    participant.release();
                }
                return null;
            }

            @Override
            protected void onPostExecute(Void aVoid) {
                cb.event(new Spin.Event<Void>());
            }
        }.execute();

这是转换为Rxjava(不是Rxjava2)的代码:

 Observable.defer(new Func0<Observable<Void>>() {
        @Override
        public Observable<Void> call() {
            Set<Map.Entry<String, Participant>> entries = pool.entrySet();
            for (Map.Entry<String, Participant> entry : entries) {
                Participant participant = entry.getValue();
                participant.release();
            }
            return Observable.just(null);
        }
    }).doOnCompleted(new Action0() {
        @Override
        public void call() {
            cb.event(new Spin.Event<Void>());
        }
    })
    .subscribeOn(Schedulers.computation())
    .subscribe();

将其转换为Rxjava 而不在返回 null 时崩溃的最佳方法是什么。另外,.execute() 相对于Rxjava2 是如何发挥作用的?不确定这是否适用于Rxjava

这是崩溃日志:

FATAL EXCEPTION: RxComputationThreadPool-3

                                                                           io.reactivex.exceptions.OnErrorNotImplementedException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
                                                                               at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
                                                                               at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
                                                                               at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:35)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842)
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
                                                                               at java.lang.Thread.run(Thread.java:818)
                                                                            Caused by: java.lang.NullPointerException: null ObservableSource supplied
                                                                               at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
                                                                               at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
                                                                               at io.reactivex.Observable.subscribe(Observable.java:10842) 
                                                                               at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
                                                                               at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
                                                                               at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
                                                                               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
                                                                               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
                                                                               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
                                                                               at java.lang.Thread.run(Thread.java:818) 

【问题讨论】:

  • 不清楚这里是什么为空。请添加堆栈跟踪
  • 请发布您的堆栈跟踪,同时您还需要添加 .observeOn(AndroidSchedulers.mainThread())。
  • 我会尽快发布。另外,返回 Observable.just(null);这在 rxjava 2 中是否允许?
  • 感谢@Geros,请查看崩溃日志。(刚刚编辑了我上面的问题)。另外,有没有更好的方法将我的原始 asynctask 代码转换为 rxjava 2?

标签: android android-asynctask rx-android rx-java2


【解决方案1】:

由于您没有要回发到主线程的值,您可以使用Completable

Completable.fromAction(() -> {
    Set<Map.Entry<String, Participant>> entries = pool.entrySet();
    for (Map.Entry<String, Participant> entry : entries) {
        Participant participant = entry.getValue();
        participant.release();
    }
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    () -> {
         cb.event(new Spin.Event<Void>());
    }, 
    error -> { /* show error toast */ }
);

【讨论】:

    【解决方案2】:
    Observable.defer(new Callable<ObservableSource<?>>() {
                @Override
                public ObservableSource<?> call() throws Exception {
                    Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                    for (Map.Entry<String, Participant> entry : entries) {
                       Participant participant = entry.getValue();
                       participant.release();
                    }
                    return Completable.complete().toObservable();
                }
            }).doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d("Complete", "Complete");
                }
            })
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread()).subscribe();
    

    此代码也可以使用。调用 subscribe() 方法将启动作业。

    【讨论】:

    • 如果不使用 completeable 来修复我的代码并使其正常工作,就没有其他方法了吗?我尝试添加 subscribeon(androidschedulers.mainthread()) 但导致项目为空崩溃。一定有办法改善我已经拥有的东西吧?
    • 您可以将上述代码的 Completeable 替换为 Observable
    • 我不能真正在函数外使用 cb,因为它是主函数 Callback cb 的一部分,我在其中调用 observable。我不能像现在这样在一个地方使用所有东西,但里面有某种形式的错误处理吗?它抛出错误未处理的异常,并且 item 经常为 null。我如何将我的 asyntask 中的 .execute() 转换为 rx?
    • 继续得到这个:io.reactivex.exceptions.OnErrorNotImplementedException:该项目在 io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept 处为空
    • 谢谢!这似乎可行,但是出于好奇,rxjava 2 与以前的 rxjava 甚至 asynctask 相比有什么优势?它会提高我的应用性能吗?
    【解决方案3】:
    Observable.defer(new Callable<ObservableSource<?>>() {
    
     //This method is replacing doInBackground
            @Override
            public ObservableSource<?> call() throws Exception {  
                Set<Map.Entry<String, Participant>> entries = pool.entrySet();
                for (Map.Entry<String, Participant> entry : entries) {
                   Participant participant = entry.getValue();
                   participant.release();
                }
                return Completable.complete().toObservable();
            }
        }).doOnComplete(new Action() {
         //This is onPostExecute
            @Override 
            public void run() throws Exception {
                Log.d("Complete", "Complete");
            }
        })
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread()).subscribe()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多