【问题标题】:Mainthread ends before CompletabeObserver gets called主线程在 CompletabeObserver 被调用之前结束
【发布时间】:2018-12-22 11:36:51
【问题描述】:

在下面的代码中,我创建了一个示例来学习使用 Rx 进行函数式编程。我正在尝试将 HandlerThread 处理为可观察的。在 onResume() 中,我订阅了 Single.just observable 来启动 处理线程。

SingleObserver 回调被正确调用。但是,对于 onLooperPrepared() 中的 CompletableObserver,它永远不会被调用,尽管 Completable.complete() 被调用了。

我还发布了 logcat。请看一下,请告诉我为什么我没有收到来自 CompletableObserver 的任何日志。

代码

onResume() {
    this.mMyHandlerThreadInitSingleObs = Single.just(this.getInitializedHandlerThread())
            .map(myHandlerThread->{
                Log.d(TAG_LOG, "BEFORE .start()");
                myHandlerThread.start();
                Log.d(TAG_LOG, "AFTER .start()");

                return this.mMyHandlerThread;
            });
    this.mMyHandlerThreadInitSingleObs
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this.getSingleObserver());

}
private static class MyHandlerThread extends HandlerThread {
    private final static String TAG_LOG = ActMain.TAG_LOG + "." + MyHandlerThread.class.getSimpleName();
    private Handler mHandler = null;

    MyHandlerThread(String name) {
        super(name);
        Log.v(MyHandlerThread.TAG_LOG, "constructor called");
    }

    @Override
    protected void onLooperPrepared() {
        super.onLooperPrepared();
        String TAG_LOG = MyHandlerThread.TAG_LOG + "." + "onLooperPrepared";
        Log.d(TAG_LOG, "this.getLooper(): " + this.getLooper());

        Completable.create(emitter -> {
            if (this.getLooper() != null) {
                Log.d(TAG_LOG+"."+"Completable.create", "this.getLooper() initialized: " + this.getLooper());
                Completable.complete();
            } else {
                Log.e(TAG_LOG+"."+"Completable.create", "this.getLooper() is null: " + this.getLooper());
                Completable.error(new NullPointerException(THROW_NPE_ON_LOOPER_NULL));
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.trampoline())
        .subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG_LOG, "[onSubscribe]");
            }

            @Override
            public void onComplete() {
                Log.i(TAG_LOG, "[onComplete]");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG_LOG, "[onError]: e.getMessage(): " + e.getMessage());
            }
        });
    }
}

private SingleObserver<MyHandlerThread> getSingleObserver() {
    String TAG_LOG = ActMain.TAG_LOG + "." + "getSingleObserver()";
    return new SingleObserver<MyHandlerThread>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.v(TAG_LOG, "[onSubscribe]");
        }

        @Override
        public void onSuccess(MyHandlerThread myHandlerThread) {
            Log.v(TAG_LOG, "[onSuccess]");
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG_LOG, "[onError]");
        }
    };
}

logcat

2018-12-22 12:40:12.118 V/ActMain: onStart
2018-12-22 12:40:12.120 V/ActMain.MyHandlerThread: constructor called
2018-12-22 12:40:12.125 V/ActMain.getSingleObserver(): [onSubscribe]
2018-12-22 12:40:12.129 D/ActMain.onResume(): BEFORE .start()
2018-12-22 12:40:12.129 D/ActMain.onResume(): AFTER .start()
2018-12-22 12:40:12.130 V/ActMain.MyHandlerThread.onLooperPrepared: ..
2018-12-22 12:40:12.130 01 D/ActMain.MyHandlerThread.onLooperPrepared: this.getLooper(): Looper (my HandlerThread, tid 352) {1353cc7}
2018-12-22 12:40:12.131 01 V/ActMain.MyHandlerThread.onLooperPrepared: [onSubscribe]
2018-12-22 12:40:12.132 D/ActMain.MyHandlerThread.onLooperPrepared.Completable.create: this.getLooper() initialized: Looper (my HandlerThread, tid 352) {1353cc7}
2018-12-22 12:40:12.139 V/ActMain.getSingleObserver(): [onSuccess]
2018-12-22 12:40:12.436 I/ActivityManager: Displayed com.example.amrbakri.rxhandlerthread_01/.ActMain: +552ms

【问题讨论】:

  • 我看到你有创建日志 Completable 。你在期待什么??
  • @RicardKollcaku 我希望从 CompletableObservable 接收日志!!
  • 您正在接收来自 Completable create and subscribe 的日志

标签: android java-8 observable reactive-programming rx-java2


【解决方案1】:

首先 Completable.complete();没有意义,因为这是一个返回 Completable 的静态函数,如果您尝试将流发送到 oncomplete 您需要像这样调用发射器的 onComplete() 。

    Completable.create(emitter -> {
        emitter.onComplete();
        System.out.println("1");
    }).subscribeOn(Schedulers.io())
            .observeOn(Schedulers.trampoline()).subscribe(new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("2");

        }

        @Override
        public void onComplete() {
            System.out.println("3");

        }

        @Override
        public void onError(Throwable e) {
            System.out.println("4");
        }
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-13
    • 1970-01-01
    • 2021-10-18
    • 1970-01-01
    • 1970-01-01
    • 2019-05-09
    相关资源
    最近更新 更多