【问题标题】:Using RxAndroidBle, how do I subscribe to responses from writing to a characteristic?使用 RxAndroidBle,我如何订阅响应写入特性?
【发布时间】:2016-12-15 04:08:35
【问题描述】:

我要连接的 BLE 设备在其 GATT 特性之一上发出字节,以响应对该特性的写入。客户端应该启用有关该特征的通知,并解释该特征的更改字节。 (我控制的行为是打开附近无线网络的扫描服务,然后监听服务输出。)

我正在使用 RxAndroidBle 并关注 examples。我有一个活动连接 Observable。我要观察的特征有一个名为AP_SCAN_DATA 的UUID。它应该发出0xFE 以响应接收到书面0xFF

我如何调用setupNotification并在其上设置一个观察者来捕获发出的byte[]s,然后将一个值写入特征,以便我可以捕获响应?

到目前为止我的最大努力:

connectionObservable.observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<RxBleConnection>() {
                @Override
                public void onCompleted() { // ignore...
                }

                @Override
                public void onError(Throwable e) { // ignore...
                }

                @Override
                public void onNext(final RxBleConnection connection) {
                    Observable.just(connection)
                              .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                                  @Override
                                  public Observable<Observable<byte[]>> call(RxBleConnection connection) {
                                      return connection.setupNotification(AP_SCAN_DATA);
                                  }
                            })
                            .doOnNext(new Action1<Observable<byte[]>>() {
                                @Override
                                public void call(Observable<byte[]> observable) {
                                    Log.i(TAG, "notification has been set up");
                                    // This code logs on DEBUG that a write was made, but no response ever arrives 
                                    connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes())
                                            .observeOn(AndroidSchedulers.mainThread())
                                            .subscribe();

                                }
                            })
                            .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                                @Override
                                public Observable<byte[]> call(Observable<byte[]> observable) {
                                    return observable;
                                }
                            })
                            .doOnNext(new Action1<byte[]>() {
                                @Override
                                public void call(byte[] bytes) {
                                    Log.i(TAG, "want to read response bytes here, but I don't... " + HexString.bytesToHex(bytes));
                                }
                            })
                            .subscribe();
                }
            });

【问题讨论】:

    标签: android rxandroidble


    【解决方案1】:

    已经有一个话题你可以从中找到一些见解 -> RxAndroidBle keeping a persistant connection + Write/Notification handling

    这就是您如何在仅使用单个 .subscribe() 时获得相同结果的方法。

        connectionObservable
                .flatMap( // when the connection is available...
                        rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification...
                        (rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest( // ... when the notification is setup...
                                rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic...
                                apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA
                                (writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes...
                        )
                )
                .flatMap(observable -> observable) // ... flatMap the result as it is Observable<byte[]>...
                .first() // ... and finish after first response is received to cleanup notifications
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        responseBytes -> { /* consume the response here */ },
                        throwable -> { /* handle exception */ }
                );
    

    仅供参考 - 您应该处理每个 .subscribe() 中的错误,除非您 100% 确定 Observable 不会发出错误。

    【讨论】:

      【解决方案2】:

      对于那些不使用支持 lambda 的 Java 版本的读者,这是我对 @s_noopy 答案的实现。

      connectionObservable
          .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                  @Override
                  public Observable<Observable<byte[]>> call(RxBleConnection connection) {
                      return connection.setupNotification(AP_SCAN_DATA);
                  }             
              }, new Func2<RxBleConnection, Observable<byte[]>, Observable<byte[]>>() {
                  @Override
                  public Observable<byte[]> call(RxBleConnection connection, Observable<byte[]> apScanDataNotificationObservable) {
                      return Observable.combineLatest(
                          connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes()),
                          apScanDataNotificationObservable.first(),
                          new Func2<byte[], byte[], byte[]>() {
                              @Override
                              public byte[] call(byte[] writtenBytes, byte[] responseBytes) {
                                          return responseBytes;
                                      }
                                  }
                              );
                          }
                      }
                  ).flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                      @Override
                      public Observable<byte[]> call(Observable<byte[]> observable) {
                          return observable;
                      }
                  })
                  .first()
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(new Action1<byte[]>() {
                      @Override
                      public void call(byte[] bytes) {
                          Log.i(TAG, "notification response...." + HexString.bytesToHex(bytes));
                      }
                  }, new Action1<Throwable>() {
                      @Override
                      public void call(Throwable throwable) {
                          logError(throwable);
                      }
                  });
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-01-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-04-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多