【发布时间】:2018-09-11 21:49:30
【问题描述】:
RxJava 还有一个奇怪的问题,我猜它与CompositeDisposable.clear causes OkHttp to throw java.lang.IllegalStateException: Unbalanced enter/exit有关
是同一个问题吗?
代码如下所示:
Observable<Stuff> observable = Observable.create(new ObservableOnSubscribe<Stuff>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Stuff> e) throws Exception {
//do OkHttp stuff, only place with network calls, then call onNext(stuff).
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
DisposableObserver<Stuff> disposableObserver = observable
.subscribeWith(new DisposableObserver<Stuff>() {......});
disposables.add(disposableObserver);
异常如下所示:
Caused by android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:120)
at okio.Okio$2.read(Okio.java:140)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
at okio.RealBufferedSource.read(RealBufferedSource.java:47)
at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:363)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:407)
at okhttp3.internal.Util.skipAll(Util.java:175)
at okhttp3.internal.Util.discard(Util.java:157)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:424)
at okio.RealBufferedSource.close(RealBufferedSource.java:469)
at okhttp3.internal.cache.CacheInterceptor$1.close(CacheInterceptor.java:206)
at okio.RealBufferedSource.close(RealBufferedSource.java:469)
at okio.RealBufferedSource$1.close(RealBufferedSource.java:453)
at java.nio.channels.Channels$ReadableByteChannelImpl.implCloseChannel(Channels.java:255)
at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(AbstractInterruptibleChannel.java:166)
at java.lang.Thread.interrupt(Thread.java:957)
at java.util.concurrent.FutureTask.cancel(FutureTask.java:146)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.cancel(ScheduledThreadPoolExecutor.java:258)
at io.reactivex.internal.schedulers.ScheduledRunnable.dispose(ScheduledRunnable.java:107)
at io.reactivex.disposables.CompositeDisposable.dispose(CompositeDisposable.java:217)
at io.reactivex.disposables.CompositeDisposable.dispose(CompositeDisposable.java:80)
at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.dispose(IoScheduler.java:210)
at io.reactivex.Scheduler$DisposeTask.dispose(Scheduler.java:464)
at io.reactivex.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:125)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.dispose(ObservableSubscribeOn.java:74)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.dispose(ObservableObserveOn.java:146)
at io.reactivex.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:125)
at io.reactivex.observers.DisposableObserver.dispose(DisposableObserver.java:91)
at io.reactivex.disposables.CompositeDisposable.dispose(CompositeDisposable.java:217)
at io.reactivex.disposables.CompositeDisposable.clear(CompositeDisposable.java:183)
at mypackage.MyActivity.onStop(MyActivity.java:320)
at android.app.Instrumentation.callActivityOnStop(Instrumentation.java:1297)
at android.app.Activity.performStop(Activity.java:7168)
at android.app.ActivityThread.performDestroyActivity(ActivityThread.java:4543)
at android.app.ActivityThread.handleDestroyActivity(ActivityThread.java:4609)
at android.app.ActivityThread.-wrap7(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1692)
at android.os.Handler.dispatchMessage(Handler.java:102)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6682)
at java.lang.reflect.Method.invoke(Method.java)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1410)
编辑:这本质上是subscribe 方法中的OkHttp 代码:
try {
OkHttpClient.Builder okClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = okClientBuilder.build();
Request.Builder builder = new Request.Builder()
.get()
.url(address);
Call okCall = client.newCall(builder.build());
Response res = okCall.execute();
InputStream stream = res.body().byteStream();
Parser parsed = Parser.parse(stream);
stream.close();
e.onNext(parsed);
} catch (IOException ex) {
e.onError(ex);
}
【问题讨论】:
-
你能把你的
DisposableObserver也填上吗?我对正在发生的事情的理论是,您的观察者正在尝试在可观察对象被处置时做某事。 -
我不会在
DisposableObserver上覆盖dispose(),因为它是最终结果。 -
您仍然替换了您创建的 DisposableObserver 的内容,因此很难缩小问题的范围。
-
我的
onNext、onError和onComplete都不会根据堆栈跟踪被调用,除非我误读了它。似乎它在CompositeDisposable上调用dispose,这反过来又做了很多事情,包括调用FutureTask.cancel,并触发java.lang.Thread.interrupt。正因为如此,我认为这可能与我在使用完全相同的代码 stackoverflow.com/questions/49581241/… 时遇到的另一个问题有关 -
请提供“//do OkHttp stuff”和“subscribeWith”的实现。可能是github.com/square/okhttp/issues/1125
标签: android rx-java rx-java2 okhttp okhttp3