【问题标题】:MissingBackpressureException even .onBackpressureDrop() is added?MissingBackpressureException 甚至 .onBackpressureDrop() 被添加?
【发布时间】:2017-07-31 07:38:52
【问题描述】:

我有 MissingBackpressureException 的问题。

我添加了几个 .onBackpressureDrop() 只是为了测试,但仍然出现异常。

我添加了 RxJavaHooks.enableAssemblyTracking() 以获得更多日志详细信息。

1-3 分钟后抛出异常。

知道这段代码有什么问题吗?

谢谢帮助。

抛出异常的代码:

  Subscription notifySubscription = connection.setupNotification(notifyCharacteristic)
                                    .onBackpressureBuffer()
                                    .doOnNext(new Action1<Observable<byte[]>>() {
                                        @Override
                                        public void call(Observable<byte[]> observable) {
                                            Log.d(TAG, "Notifications set, calling bypassConnect()");

                                            // Bypass connect in WB to make it aware of this new device
                                            String wbAddress = addressMap.getOrCreateWbAddress(bleMac);
                                            bleWrapper.bypassConnect(wbAddress);
                                        }
                                    })
                                    .onBackpressureDrop()
                                    .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                                        @Override
                                        public Observable<byte[]> call(Observable<byte[]> observable) {
                                            return observable;
                                        }
                                    })
                                    .onBackpressureDrop()
                                    .subscribe(new Action1<byte[]>() {
                                        @Override
                                        public void call(byte[] bytes) {
                                            dataAvailable(bleMac, bytes);
                                        }
                                    }, new Action1<Throwable>() {
                                        @Override
                                        public void call(Throwable throwable) {
                                            Log.e(TAG, "dataAvailable() Error: ", throwable);
                                        }
                                    });

登录:

rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:325)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at com.jakewharton.rxrelay.RelaySubscriptionManager$RelayObserver.onNext(RelaySubscriptionManager.java:205)
at com.jakewharton.rxrelay.PublishRelay.call(PublishRelay.java:47)
at com.jakewharton.rxrelay.SerializedAction1.call(SerializedAction1.java:84)
at com.jakewharton.rxrelay.SerializedRelay.call(SerializedRelay.java:20)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$4.onCharacteristicChanged(RxBleGattCallback.java:139)
at android.bluetooth.BluetoothGatt$1.onNotify(BluetoothGatt.java:438)
at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:399)
at android.os.Binder.execTransact(Binder.java:453)
Caused by: rx.exceptions.AssemblyStackTraceException: Assembly trace:
at rx.Observable.unsafeCreate(Observable.java:162)
at rx.Observable.lift(Observable.java:299)
at rx.Observable.merge(Observable.java:2572)
at rx.Observable.merge(Observable.java:2914)
at rx.Observable.merge(Observable.java:2637)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback.getOnCharacteristicChanged(RxBleGattCallback.java:311)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.observeOnCharacteristicChangeCallbacks(NotificationAndIndicationManager.java:186)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.access$400(NotificationAndIndicationManager.java:31)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:110)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:107)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
at rx.in

【问题讨论】:

    标签: android rx-java rx-android rxandroidble


    【解决方案1】:

    看起来rxandroidble 返回的Observable 不支持背压,但flatMap 期望它,因此您必须在flatMap 中应用.onBackpressureDrop

    .flatMap(observable -> observable.onBackpressureDrop())
    

    请注意,outside flatMap 应用运算符通常不会影响合并源时发生的内部情况。

    【讨论】:

    • 不能使用 java 8。你是说这个吗?返回 observable.onBackpressureDrop() ?
    • 如果是。我仍然在 flatMap 中使用 onBackpressureDrop() 获得 MissingBackpressureException。
    • 程序集跟踪指向RxBleGattCallback.withHandlingStatusErrorAndDisconnection,但当前文件看起来不同。也许您使用的是旧版本?
    • 显然,您已经在 their issue list 上发布了此内容。
    • 是的,我在他们的问题列表中创建了 BUG,因为我自己无法在代码中找到错误。无论如何谢谢@akarnokd的帮助
    猜你喜欢
    • 2016-08-20
    • 1970-01-01
    • 1970-01-01
    • 2021-11-12
    • 2014-03-18
    • 1970-01-01
    • 1970-01-01
    • 2013-06-23
    • 1970-01-01
    相关资源
    最近更新 更多