【问题标题】:RxJava NetworkOnMainThreadException while subscribing订阅时出现 RxJava NetworkOnMainThreadException
【发布时间】:2021-03-25 11:27:37
【问题描述】:

我正在尝试调用 API 并将数据添加到我的 LiveData,但在我订阅后,我在 OnError 中得到 MainThreadException。尝试了不同的调度程序,但没有成功。添加了来自 onError 的堆栈跟踪。

private void pullLocation(){

    myLocationService.getLocation()
            .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
                @Override
                public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                    return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key");
                }
            })

            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Subscriber<DistanceResponseModel>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe in DistanceViewModel called. ");
        }

        @Override
        public void onNext(DistanceResponseModel distanceResponseModel) {
            distanceLiveData.postValue(distanceResponseModel);
            Log.d(TAG, "onNext in DistanceViewModel called. ");
        }

        @Override
        public void onError(Throwable t) {
            Log.e("YOUR_APP_LOG_TAG", "I got an error:", t);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete in DistanceViewModel called. ");
        }
    });
}

编辑:

添加了带有错误代码的详细堆栈跟踪:

E/YOUR_APP_LOG_TAG: I got an error
    android.os.NetworkOnMainThreadException
        at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:102)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:90)
        at java.net.InetAddress.getAllByName(InetAddress.java:787)
        at okhttp3.Dns.lambda$static$0(Dns.java:39)
        at okhttp3.-$$Lambda$Dns$9evC3uO-H_z08sS9O-4-hLhZ8es.lookup(Unknown Source:0)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:135)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:187)
        at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
        at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
        at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
        at okhttp3.RealCall.execute(RealCall.java:81)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12030)
        at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
        at io.reactivex.Single.subscribe(Single.java:3394)
        at io.reactivex.internal.operators.flowable.FlowableFlatMapSingle$FlatMapSingleSubscriber.onNext(FlowableFlatMapSingle.java:132)
        at io.reactivex.internal.operators.flowable.FlowableHide$HideSubscriber.onNext(FlowableHide.java:68)
        at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
        at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
        at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
        at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
        at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
        at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:164)
        at android.app.ActivityThread.main(ActivityThread.java:6494)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)

这是我获得位置的地方:

public class MyLocationServiceClass implements MyLocationService {

    PublishProcessor<Location> stream = PublishProcessor.create();
    LocationListener listener = new LocationListener() {
        @Override
        public void onLocationChanged(@NonNull Location location) {
            stream.onNext(location);
            Log.d(TAG, "onLocationChanged: stream.onNext(location) called");
        }

        @Override
        public void onProviderEnabled(@NonNull String provider) {

        }

        @Override
        public void onProviderDisabled(@NonNull String provider) {

        }

        @Override
        public void onStatusChanged(String provider, int status, Bundle extras) {

        }
    };

    private LocationManager locationManager;
    private boolean flag;
    private Context context;

    public MyLocationServiceClass(Context context, LocationManager locationManager, boolean flag) {
        this.context = context;
        this.locationManager = locationManager;
        this.flag = flag;

    }

    @OnLifecycleEvent(Lifecycle.Event.ON_START)
    void onStart() {
        if (flag) {
            start();
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_STOP)
    void onStop() {
        locationManager.removeUpdates(listener);
    }


    @Override
    public Flowable<Location> getLocation() {
        return stream.hide();
    }

    @Override
    public void start() {
        long minTimeMs = 10000;
        float minDistanceM = 5.5F;


        if (ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) != PackageManager.PERMISSION_GRANTED && ActivityCompat.checkSelfPermission(context, Manifest.permission.ACCESS_COARSE_LOCATION) != PackageManager.PERMISSION_GRANTED) {

            return;
        }
        Log.d(TAG, "start(): requestLocationUpdates called.");
        locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, minTimeMs, minDistanceM, listener);

    }

    @Override
    public void updatePermission(boolean newState) {
        flag = newState;
    }

public interface MyLocationService extends LifecycleObserver {

    Flowable<Location> getLocation();

    void start();
    void updatePermission(boolean newState);
}

知道会是什么吗?如果您想了解更多详情,请告诉我。希望这会有所帮助。

带有完整代码的仓库:https://github.com/LightingTT/Compass

【问题讨论】:

  • 能否请您发布堆栈跟踪?
  • 好吧,我的代码可以编译,所以没有错误。但是,当我登录 onError 时,会抛出 NetworkOnMainThreadException。不确定粘贴整个日志是否有用。
  • 看看哪里抛出异常会很有趣。记录堆栈跟踪,而不仅仅是异常的名称。在那里,您还可以包含getLocation() 的关键部分。
  • 直接尝试distanceRepository.distanceResponseAPI(...).subscribeOn(Schedulers.io())。在flatMap 之后应用它不会影响您设置中的 lambda。
  • @akarnokd,该死的。我来不及了。是的,flatMapSingle 似乎是问题的根本原因(例如堆栈跟踪:flowable.FlowableFlatMapSingle)。正如@akarnokd 所建议的那样,我会提出同样的建议。如果你这行得通,我会写一个答案,为什么会这样,就像它一样。

标签: android rx-java rx-java2


【解决方案1】:

问题

调用distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key") 时,会向订阅者传播一个onError (NetworkOnMainThreadException)。

为什么会抛出异常?

禁止在 UI-Eventloop 上做网络请求,以免阻塞 UI。

为什么 subscribeOn 没有帮助?

subscribeOn 在给定调度器上调用上游操作符的 subscribeActual 方法。在您的情况下,订阅 flatMapSingle 运算符是从 IO-Scheduler 中的工作线程调用的。 flatMapSingle 在同一个线程上调用 myLocationService.getLocation()

正如您在堆栈跟踪中看到的那样,从getLocation 返回的 source-observable 的 onNext 是从 UI 线程发出的

    at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:361)
    at io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:244)
    at com.example.compass.viewModels.MyLocationServiceClass$1.onLocationChanged(MyLocationServiceClass.java:35)
    at android.location.LocationManager$ListenerTransport._handleMessage(LocationManager.java:292)
    at android.location.LocationManager$ListenerTransport.-wrap0(Unknown Source:0)
    at android.location.LocationManager$ListenerTransport$1.handleMessage(LocationManager.java:237)
    at android.os.Handler.dispatchMessage(Handler.java:106)
    at android.os.Looper.loop(Looper.java:164)

该值是通过调用线程(UI-Thread)上的 onNext 发出的,因为回调是从 UI-thread 调用的。 flatMapSingle 运算符也在调用 UI 线程上调用。它从 lambda 订阅返回的 Single 并在调用线程上订阅它。因此,UI 线程调用 UI 线程上的网络请求,Android 运行时抛出异常。 SubscribeOn 不确定,onNext 在给定Scheduler 的下游。它只确保订阅发生在此线程中。在您的情况下,您的 source-observable 将通过 UI-Thread 上的 Android 运行时获得通知。

解决方案?

myLocationService.getLocation() 之后使用observeOn 以便将线程从一个运算符切换到另一个运算符。 observeOn 确保 onNext 下游调用发生在给定的调度程序上。因此,订阅flatMapSingle 中的内部流将发生在给定的调度程序工作线程而不是 ui 线程上。您也可以在flatMapSingle 中的Single 上应用subscribeOn。这将确保网络调用将发生在给定的调度程序工作线程上。

        .flatMapSingle(new Function<Location, Single<DistanceResponseModel>>() {
            @Override
            public Single<DistanceResponseModel> apply(@NonNull Location location) throws Exception {
                return distanceRepository.distanceResponseAPI(location.getLatitude() + "," + location.getLongitude(), getDestinations(), "my_google_api_key")
                                         .subscribeOn(Schedulers.io());
            }
        })

进一步阅读

http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html?m=1

【讨论】:

    猜你喜欢
    • 2016-12-11
    • 1970-01-01
    • 1970-01-01
    • 2021-03-21
    • 1970-01-01
    • 2016-03-24
    • 1970-01-01
    • 1970-01-01
    • 2018-12-15
    相关资源
    最近更新 更多