【问题标题】:RxJava flatmap crashing even though onError is present即使存在 onError,RxJava 平面图也会崩溃
【发布时间】:2018-10-08 10:09:37
【问题描述】:

我正在尝试调试在设备没有网络连接时发生的 Android 崩溃。崩溃只发生在某些时候(大约 50% 的时间),我不知道为什么。如果有人能解释发生了什么或提供解决方案,我将不胜感激。

我正在使用平面地图并行获取图像,并使用 onError() 块进行订阅。尽管如此,应用程序仍然崩溃。

这是代码(我用“Crash here:”标记了它崩溃所在的行):

fun downloadImages() {
    var progress = 0
    var maxProgress = 0

    downloadModel.postValue(
            Resource.inProgress(getString(R.string.downloading_offline_data)))

    disposables.add(Observable
            .create<Pair<String, File>> { emitter ->
                val files = readFileDirs()

                for (file in files) {
                  val urlList = readUrlListFrom(file)
                  maxProgress += urlList.size

                  for (url in urlList) {
                    emitter.onNext(url to file)
                  }
                }
                emitter.onComplete()
            }
            .flatMap { (url, file) ->
                downloadImage(url, file)
                        .subscribeOn(Schedulers.computation())
                        .andThen(Observable.fromCallable {
                            ++progress
                        })
            }
            .doOnDispose {
                downloadModel.postValue(Resource.idle())
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.d(TAG, "Done downloading $it")

                if (it % 100 == 0) {
                    refreshCacheSize()
                }

                downloadModel.postValue(Resource.inProgress(
                        getString(R.string.downloading_offline_data),
                        it,
                        maxProgress
                ))
            }, { error ->
                val errorId = when (error) {
                    is SocketTimeoutException ->
                        NETWORK_TIMEOUT_ERROR
                    is InterruptedIOException -> {
                        // do nothing
                        return@subscribe
                    }
                    is UnknownHostException ->
                        NETWORK_ERROR
                    is IOException ->
                        UNKNOWN_ERROR
                    is UnsupportedServerVersionException ->
                        UNSUPPORTED_VERSION_ERROR
                    else -> {
                        UNKNOWN_ERROR
                    }
                }
                downloadModel.postValue(Resource.error(errorId))
            }, {
                refreshCacheSize()

                downloadModel.postValue(Resource.success())
            })
    )
}

private fun downloadImage(url: String, outFile: File): Completable {
    val outFileTmp = File(outFile.canonicalPath + ".tmp")
    return Completable
            .create { emitter ->
                val request = Request.Builder()
                        .url(url)
                        .build()
                try {
Crash here:          val response = Client.get().newCall(request).execute()
                    val sink: BufferedSink
                    try {
                        Log.d(TAG, "Writing to file")
                        val body = response.body()
                        if (body != null) {
                            sink = Okio.buffer(Okio.sink(outFileTmp))
                            sink.writeAll(body.source())
                            sink.close()
                        } else {
                            if (!emitter.isDisposed) {
                                emitter.onError(DownloadImageException("Body was null."))
                            }
                        }
                    } finally {
                        response.body()?.close()
                    }

                    outFileTmp.renameTo(outFile)

                    emitter.onComplete()
                } catch (e: Exception) {
                    Log.e(TAG, "Could not save splash", e)
                    emitter.tryOnError(e)
                }
            }
            .doOnDispose {
                outFileTmp.delete()
            }
            .doOnError {
                outFileTmp.delete()
            }
}

堆栈跟踪:

2018-10-08 02:50:07.055 21984-22263/com.ggstudios.lolcatalyst E/AndroidRuntime: FATAL EXCEPTION: RxComputationThreadPool-2
    Process: com.ggstudios.lolcatalyst, PID: 21984
    java.lang.Throwable: Unable to resolve host "mywebsite.com": No address associated with hostname
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:157)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105)
        at java.net.InetAddress.getAllByName(InetAddress.java:1154)
        at okhttp3.Dns$1.lookup(Dns.java:40)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372)
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:764)
     Caused by: java.lang.Throwable: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
        at libcore.io.Linux.android_getaddrinfo(Native Method)
        at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:172)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:137)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105) 
        at java.net.InetAddress.getAllByName(InetAddress.java:1154) 
        at okhttp3.Dns$1.lookup(Dns.java:40) 
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185) 
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149) 
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84) 
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214) 
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) 
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) 
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) 
        at okhttp3.RealCall.execute(RealCall.java:77) 
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372) 
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) 
        at java.lang.Thread.run(Thread.java:764) 

【问题讨论】:

标签: android kotlin rx-java


【解决方案1】:

我找出了原因和一些可能的解决方案。

让我们先从原因开始。

我的代码可以这样简化:Observable #1 从 n 个其他 Observables 开始,这些 Observables 通过 flatMap() 在其他线程上执行。

为简单起见,我们假设所有的 Observable 都在它们自己的线程上执行。然后,当一个 observable 抛出错误时,该错误会传播到所有其他 Observable 并处理它们。

问题在于这种传播不同步(推理似乎是出于性能原因)。因为这个tryOnError()可以通过isDisposed()检查,然后被处理,然后触发错误。这就解释了为什么即使使用了tryOnError() 仍然会发生异常。

一些解决方案

经过一些研究和讨论,我最终想出了两个主要的解决方案。

一种方法是通过onErrorReturn() 将所有错误转换为 Observables 中的结果。此解决方案有效,但如果您想在遇到错误后立即停止执行,则可能效果不佳。

另一种方法是注册一个全局错误处理程序并忽略这些错误。

我选择了后一种解决方案,但我并不完全满意。我真的希望有一种方法可以注册本地错误处理程序。例如。在 Observables 上有一个方法可以忽略处理 Observable 后发出的错误。

哦,好吧。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-04-25
    • 1970-01-01
    • 1970-01-01
    • 2012-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多