【问题标题】:Dynamic switching between RxJava observablesRxJava observables 之间的动态切换
【发布时间】:2019-03-14 09:16:21
【问题描述】:

我有两个 observable:一个发出从蓝牙连接接收到的 ByteArrays,另一个发出周期性服务器请求的结果。

fun observeBluetooth():Observable<ByteArray>
fun observeServer():Observable<ByteArray>

蓝牙连接状态可用(作为布尔变量或作为 Observable)

我需要结合 observables 考虑到以下要求:

  1. 如果蓝牙已连接,则必须发出从 observeBluetooth() 接收到的数据(并且不得发出服务器请求)
  2. 如果未连接蓝牙,则从observeServer() 接收到的数据 必须发出
  3. 如果蓝牙连接恢复,则必须再次发出从 observeBluetooth() 接收到的数据

如何使用 RxJava/Kotlin 做到这一点?

【问题讨论】:

    标签: kotlin rx-java reactive-programming


    【解决方案1】:

    如果你有一个可观察的isBtConnected,你可以切换它:

    val isBtConnected: Observable<Boolean> = ...
    isBtConnected.switchMap {
        if (it) observeBluetooth()
        else observeServer()
    }
    

    【讨论】:

    • 您可能会认为,如果您不关心蓝牙或服务器可观察对象发出的先前数据。如果您确实关心这一点,则应该使用flatMap 而不是switchMap 可能medium.com/appunite-edu-collection/…
    • @DiegoAlejandro 这个例子并没有真正的可比性。当它关闭时,你不应该观察 BT。无论如何,您可能无法从那里获得任何数据。 switchMap 确保在服务器 observable 开始发射时将其处理掉。可能有更好的方法,只是 flatMap 并不理想,因为它假定所有来源始终处于活动状态。
    • 是的,switchMap() 是我需要的。谢谢
    【解决方案2】:

    听起来您可以根据蓝牙状态进行过滤。如果是共享原子变量:

    final AtomicBoolean isConnected = new AtomicBoolean();
    
    Observable.merge(
        observeBluetooth()
           .filter(v -> isConnected.get()),
        observeServer()
           .filter(v -> !isConnected.get())
    )
    // ... etc.
    

    如果状态也是可观察的,它会变得有点复杂,因为您需要扩展项目中的 valve 运算符:

    Observable<Boolean> isConnectedSource = ...
    
    Observable<Boolean> shared = isConnectedSource.publish().refCount(2);
    
    Observable.merge(
        observeBluetooth()
           .compose(ObservableTransformers.valve(shared)),
        observeServer()
           .compose(ObservableTransformers.valve(shared.map(v -> !v), false))
    )
    // ... etc.
    

    请注意,valve 将在关闭时暂停并继续缓冲项目。如果您需要从其他序列中删除项目,则必须结合两种方法:

    Observable<Boolean> isConnectedSource = ...
    
    final AtomicBoolean isConnected = new AtomicBoolean();
    
    Disposable status = isConnectedSource.subscribe(v -> isConnected.set(v));
    
    Observable.merge(
        observeBluetooth()
           .filter(v -> isConnected.get()),
        observeServer()
           .filter(v -> !isConnected.get())
    )
    // ... etc.
    
    status.dispose();
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-02-02
      • 2012-06-03
      • 2021-08-29
      • 2012-08-13
      • 1970-01-01
      • 2019-08-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多