【问题标题】:Sending the list of commands to device using RxAndroidBle (rxJava)使用 RxAndroidBle (rxJava) 向设备发送命令列表
【发布时间】:2016-07-07 16:20:17
【问题描述】:

我正在尝试通过 rxJava 向设备发送命令列表。这是我的代码:

public void startWriteCommucation(final ArrayList<byte[]> b) {
    if (isConnected()){
            connectionObservable
                    .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                        @Override
                        public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) {
                            final List<Observable<byte[]>> list = new ArrayList<>();
                            for (byte[] bytes: b){
                                Log.e("Observer", Arrays.toString(bytes));
                                list.add(rxBleConnection
                                        .writeCharacteristic(BleDevice.characteristicWrite, bytes));
                            }
                            return Observable.from(list);
                        }
                    })
                    .concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                        @Override
                        public Observable<byte[]> call(Observable<byte[]> observable) {
                            return observable;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<byte[]>() {
                        @Override
                        public void call(byte[] bytes) {
                            view.setTextStatus("Write success");
                            Log.e("Subscriber", Arrays.toString(bytes));
                        }
                    });
        }
}

它有效,然后我单击一次按钮。比如我的clikc方法:

 public void onClick(){
        ArrayList<byte[]> listCmd = new ArrayList<>();
        listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
        startWriteCommucation(listCmd);
}

以及 LogCat 中的 myLogs:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

但是当我使用快速双击按钮时会出现问题。然后第一次单击 observable 仍然有效,我再次单击以再次调用 startWriteCommunication 方法。在此之后,我的日志看起来是这样的:

 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

 E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

主要问题是它们不按顺序排列,而且我的设备工作不正确。你能帮忙找出问题吗?

【问题讨论】:

    标签: android rx-java rx-android rxandroidble


    【解决方案1】:

    问题在于 RxAndroidBle 库错误(导致响应与请求不匹配)并在两个有状态的通信流之间共享连接(需要按顺序进行两次写入,而两者之间没有任何通信)。

    错误:应该写入 BluetoothGattCharacteristic 的值 (byte[]) 设置得太早。如果有两个并行写入器具有相同的特性 - 其中一个可能会覆盖由于竞争条件而由另一个设置的 byte[]。我已经对库进行了修复,该库现在正在代码审查过程中,应该在不久的将来应用于 SNAPSHOT 版本。

    更改后的输出将如下所示:

    D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    

    可能的解决方案

    如果您对触发两次流不感兴趣,如果用户将快速点击按钮两次 - 您可以创建一个可共享流:

    Observable<byte[]> theSharedFlow = rxBleConnection
      .writeCharacteristic(uuid, data1)
      .flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2))
      .share()
    

    多次订阅时只会执行一次,直到完成。在上面的 sn-p 中,第二个writeCharacteristic() 将在第一个发送写入字节之后被订阅(并排队进行通信)。

    如果应用程序要在共享连接时按顺序在任意时间按顺序发送任意组命令,则应用程序应确保前一组命令已完成。

    我希望我已经回答了你的问题。如果您愿意提供有关用例的更多信息,我会尝试改进我的答案。

    最好的问候

    编辑:

    替代方案:

    要保留订单,所有 Observable 都需要订阅才能到达。 Observable 的合约是 Observable 的(如果它是冷的)直到订阅才执行。当使用flatMap() 时,一旦第一个 Observable 发出,第二个 Observable 就会被订阅。

    要使两个写入按顺序传输,它们必须以相同的顺序订阅,因此流程可能如下所示:

    connectionObservable
                .flatMap(rxBleConnection -> {
                    Observable<byte[]> mergedObservable = null;
                    for (byte[] bytes : b) {
                        Log.d("Observer", Arrays.toString(bytes));
                        final Observable<byte[]> writeObservable = rxBleConnection
                                .writeCharacteristic(uuid, bytes);
    
                        if (mergedObservable == null) {
                            mergedObservable = writeObservable;
                        } else {
                            // merging two Observables to be subscribed at the same time when subscribed
                            mergedObservable = mergedObservable.mergeWith(writeObservable);
                        }
                    }
                    return mergedObservable;
                })
                // removed .concatMap()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        bytes -> Log.d("Subscriber", Arrays.toString(bytes)),
                        throwable -> Log.e("Subscriber", "error", throwable)
                );
    

    RxJava 显然有更多方法来实现相同的行为,但这不是这个问题的一部分。

    【讨论】:

    • 谢谢,我明天试试
    • 但我想订阅者的输出将与观察者的顺序相同
    • 我已经编辑了我的回复。替代解决方案修复了该行为,尽管在我看来这只是不正确流的表现(有状态流的两个并行执行)。
    猜你喜欢
    • 1970-01-01
    • 2023-01-16
    • 2016-12-15
    • 1970-01-01
    • 1970-01-01
    • 2019-06-18
    • 1970-01-01
    • 1970-01-01
    • 2017-12-18
    相关资源
    最近更新 更多