【问题标题】:RxJava2/RxAndroidBle: subscribe to Observable from side effectsRxJava2/RxAndroidBle:订阅 Observable 的副作用
【发布时间】:2019-01-04 00:35:02
【问题描述】:

我有以下使用RxAndroidBle 的简单 BLE 设备设置过程的用例:

  1. 连接到 BLE 设备。
  2. 开始监听通知特性并设置解析器来解析每个传入的通知。然后解析器将使用PublishSubject 发布解析后的数据。
  3. 执行写入到写入特性(协商安全连接)。
  4. 等待解析器 PublishSubject 传递来自设备的解析响应 - 公钥(它通过通知特性到达,作为对我们写入的响应)。
  5. 对写入特征执行另一次写入(将连接设置为安全)。
  6. 发送Completable 说明该过程是否成功完成。

现在我的解决方案(不起作用)如下所示:

deviceService.connectToDevice(macAddress)
    .andThen(Completable.defer { deviceService.setupCharacteristicNotification() })
    .andThen(Completable.defer { deviceService.postNegotiateSecurity() })
    .andThen(Completable.defer {
        parser.notificationResultSubject
            .flatMapCompletable { result ->
                when (result) {
                    DevicePublicKeyReceived -> Completable.complete()
                    else -> Completable.error(Exception("Unexpected notification parse result: ${result::class}"))
                }
            }
    })
    .andThen(Completable.defer { deviceService.postSetSecurity() })

还有DeviceService 类:

class DeviceService {

    /**
     * Observable keeping shared RxBleConnection for reuse by different calls
     */
    private var connectionObservable: Observable<RxBleConnection>? = null

    fun connectToDevice(macAddress: String): Completable {
        return Completable.fromAction {
            connectionObservable = 
                rxBleClient.getBleDevice(macAddress)
                .establishConnection(false) 
                .compose(ReplayingShare.instance())
        }
    }

   fun setupCharacteristicNotification(): Completable =
        connectionObservable?.let {
            it
                .switchMap { connection ->
                    connection.setupNotification(UUID_NOTIFICATION_CHARACTERISTIC)
                        .map { notificationObservable -> notificationObservable.doOnNext { bytes -> parser.parse(bytes) }.ignoreElements() }
                        .map { channel ->
                            Observable.merge(
                                Observable.never<RxBleConnection>().startWith(connection),
                                channel.toObservable()
                            )
                        }
                        .ignoreElements()
                        .toObservable<RxBleConnection>()
                }
                .doOnError { Timber.e(it, "setup characteristic") }
                .take(1).ignoreElements()
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)

   fun postNegotiateSecurity(): Completable {
        val postLength = negotiateSecurity.postNegotiateSecurityLength()
        val postPGK = negotiateSecurity.postNegotiateSecurityPGKData()

        return connectionObservable?.let {
            it.take(1)
                .flatMapCompletable { connection ->
                    postLength
                        .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                        .doOnError { Timber.e(it, "post length") }
                        .flatMap {
                            postPGK
                                .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                                .doOnError { Timber.e(it, "post PGK") }
                        }
                        .take(1).ignoreElements()
                }
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)
    }

    fun postSetSecurity(): Completable =
        connectionObservable?.let {
            it.take(1)
                .flatMapCompletable { connection ->
                    negotiateSecurity.postSetSecurity()
                        .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                        .take(1).ignoreElements()
                }
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)
   }

private fun RxBleConnection.write(bytes: ByteArray): Single<ByteArray> =
    writeCharacteristic(UUID_WRITE_CHARACTERISTIC, bytes)

问题是它卡在deviceService.postNegotiateSecurity() 中并且永远不会过去。我也没有在解析器中获得任何数据,所以我认为我错误地订阅了通知特性。

negotiateSecurity.postNegotiateSecurityLength()negotiateSecurity.postNegotiateSecurityPGKData() 是准备要发送的数据并将其作为Observable&lt;SendFragment&gt; 传递的方法。由于数据帧大小的限制,一帧可能会被编码为多个片段,然后由这些Observables 发出。

【问题讨论】:

    标签: rx-java2 rxandroidble


    【解决方案1】:

    回顾:

    • postNegotiateSecurity() 永远不会完成
    • negotiateSecurity.postNegotiateSecurityLength() 可能会发射一次或多次
    • negotiateSecurity.postNegotiateSecurityPGKData() 可能会发出一次或多次

    分析(为便于阅读省略了日志):

    it.take(1)
        .flatMapCompletable { connection ->
            postLength
                .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                .flatMap {
                    postPGK // may emit more than one value
                        .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                }
                .take(1) // first emission from the above `flatMap` will finish the upstream
                .ignoreElements()
        }
    

    来自postLength 的每个发射都将开始一个特征写入。每个成功的写入都将开始订阅postPGK。如果postLength 发出不止一次 - 将会有更多postPGK 订阅。

    每次订阅postPGK 很可能会导致多次排放。然后,每个发射都将被 flatMapped 到一个特征写入。每个写入成功的写入都会发出一个值。

    在上述特征写入的第一次发射后,上游将被处理(因为.take(1) 运算符)。

    如果postNegotiateSecurity() 实际启动,它也会完成或出错(假设postLengthpostPGK 都会发出至少一个值),因为这里没有额外的逻辑。

    结论

    postNegotiateSecurity() 很可能会完成(但不是以预期的方式),因为来自postPGK 的第一个数据包将完成它。我会假设外围设备在通知任何内容之前需要完整的数据,因此它正在等待 PGK 完全传输,这不会发生如上所示。

    设置了RxBleLog.setLogLevel(RxBleLog.VERBOSE) 的应用程序日志有助于了解实际发生的情况。

    【讨论】:

    • 感谢您的回答!原来罪魁祸首是take(1),它中断了连接。我的架构也存在其他一些问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-22
    • 2023-03-23
    • 1970-01-01
    相关资源
    最近更新 更多