【问题标题】:Notifying of error events down the chain without use of traditional listener在不使用传统监听器的情况下通知错误事件
【发布时间】:2016-12-14 19:02:43
【问题描述】:

我有一个相当复杂的 Rx 代码链,用于执行许多操作。本质上,当启动时,Observable 开始发出项目列表。对于每个项目,都会建立一个连接(这些项目是 BLE 设备)并写入一个特性。

此特性每 10 秒重写一次,直到发生错误(例如拔出电池)。之后,连接原始列表中的下一项,以此类推。

这里是大部分动作发生的地方:

public Observable<String> connectForPolice(String name, RemovalListener listener) {

        StringBuilder builder = new StringBuilder(mAddress);
        builder.setCharAt(mAddress.length() - 1, '4');
        RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());

        return device.establishConnection(mContext, false)
                .timeout(12, TimeUnit.SECONDS)
                .doOnError(throwable -> {
                    Log.d(TAG, "Error thrown after timeout.");
                    throwable.printStackTrace();
                })
                .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                        byte[] value = new byte[1];
                        value[0] = (byte) (3 & 0xFF);

                        return Observable // ... we return an observable ...
                                .defer(() -> {
                                    return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
                                })
                                .repeatWhen(observable -> {
                                    return observable.delay(10, TimeUnit.SECONDS);
                                });
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorResumeNext(throwable -> {
                        throwable.printStackTrace();
                        listener.onFinished(name);
                    return Observable.empty();
                });
    }

这里有一些问题。可以看到,这个方法传入了一个传统的监听器,在onErrorResumeNext中调用了这个监听器。此侦听器用于从 Activity 中的 RecyclerView 中删除项目,但这并不重要。问题是,以这种方式使用监听器有点像 Rx 的反模式。下面是其余的相关代码,包括调用上述例子的代码:

    @Override
    public void onFinished(String deviceName) {
        mAdapter.removeItem(deviceName);
    }

    private void startConnecting() {
        mIsBeepingAll = true;
        mConnectingSubscription = Observable.from(mAdapter.getItems())
                .flatMap((Func1<Device, Observable<?>>) device -> {
                    Log.d(TAG, "connecting for policing");
                    return device.connectForPolice(device.getName(), PoliceActivity.this);
                }, 1)
                .doOnError(throwable -> throwable.printStackTrace())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d(TAG, "onError...");
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG, "onNext... ");
                    }
                });
    }

包括监听器的实现。问题是,我怎样才能执行与使用 Rx 代替侦听器所做的等效操作?

【问题讨论】:

    标签: rx-java rx-android rxandroidble


    【解决方案1】:

    最简单的方法是改变这个:

                .onErrorResumeNext(throwable -> {
                    throwable.printStackTrace();
                    listener.onFinished(name);
                    return Observable.empty();
                });
    

    进入这个:

                .onErrorResumeNext(throwable -> {
                    throwable.printStackTrace();
                    return Observable.just(name);
                });
    

    调用者会使用:

    private void startConnecting() {
        mIsBeepingAll = true;
        mConnectingSubscription = Observable.from(mAdapter.getItems())
                .flatMap((Func1<Device, Observable<?>>) device -> {
                    Log.d(TAG, "connecting for policing");
                    return device.connectForPolice(device.getName(), PoliceActivity.this);
                }, 1)
                .doOnError(throwable -> throwable.printStackTrace())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted...");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d(TAG, "onError...");
                    }
    
                    @Override
                    public void onNext(String deviceName) {
                        Log.d(TAG, "onNext... ");
                        mAdapter.removeItem(deviceName);
                    }
                });
    }
    

    【讨论】:

    • 这是我原本打算做的。我假设需要Observable.empty(),以便立即调用onCompleted(),以便重新订阅以开始对列表中的下一个项目进行操作。不是这样吗?我将不得不再次对此进行测试。
    • 确实如此。 Observable.empty() 什么都不发出——只是完成了。 Observable.just(object) 发出一个 object 然后完成。使用它是因为您依赖于外部断开连接(在正常情况下这是一个例外)并且不想出错整个流。
    • 啊,好吧,我没想到Observable.just() 会立即完成。
    猜你喜欢
    • 2022-01-14
    • 2015-09-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多