【问题标题】:RxJava: Unsubscribe async observable from within another async productionRxJava:从另一个异步生产中取消订阅异步可观察
【发布时间】:2016-07-10 19:43:47
【问题描述】:

取消订阅异步Observable 的最佳(干净)方法是在另一个异步的flatMap 中产生的? 说,我有一个显示用户聊天列表的演示文稿。 每个聊天项目都应显示其自身的最新消息。 这是一个原始代码来说明:

fun AsyncInsideAsync() {
    /**
     * A chat that 'transmits' latest message
     */
    class Chat(val name: Int) {
        val lastMessage: PublishSubject<String> = PublishSubject.create()

        fun postMessage(message: Int) {
            lastMessage.onNext("Chat: $name, Message: $message")
        }
    }

    // List of chats for user.
    val chats: PublishSubject<Chat> = PublishSubject.create()

    ///////////////////////////////////
    //        ORIGINAL SEQUENCE      //
    ///////////////////////////////////
    val sequence = chats.flatMap { it.lastMessage }

    val subscriber = TestSubscriber<String>()
    sequence.subscribe(subscriber)

    // User has single chat in a chat-list
    val chat1 = Chat(1)
    chats.onNext(chat1)

    // Someone posts a message in Chat 1
    chat1.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1"
    )

    // Someone posts another message
    chat1.postMessage(2)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2"
    )

    // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created
    val chat2 = Chat(2)
    chats.onNext(chat2)

    // Someone posts a message to Chat 2
    chat2.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1"
    )

    // Someone out there posts a message to Chat 1 that is not visible to user anymore
    chat1.postMessage(3)

    // The answer looks like this 
    //      "Chat: 1, Message: 1",
    //      "Chat: 1, Message: 2",
    //      "Chat: 2, Message: 1",
    //      "Chat: 1, Message: 3"
    // Chat 1 is still subscribed and test fails
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1",
    )
}

我想出的是使用一个主题(或共享的 observable)来阻止一系列内部订阅。但它看起来很奇怪:

    ///////////////////////////////////
    //        MODIFIED SEQUENCE      //
    ///////////////////////////////////
    val unsubscribe: PublishSubject<Boolean> = PublishSubject.create()
    val sequence = chats
            .doOnNext({ unsubscribe.onNext(true) })
            .doAfterTerminate({ unsubscribe.onNext(true) })
            .flatMap {
                it.lastMessage.takeUntil(unsubscribe)
            }

这种方法有效,但看起来很吓人。 非常感谢!

【问题讨论】:

  • 出于某种原因,我认为这不是 Rx 的好匹配;如果您使用事件总线对用例进行建模,也许您会做得更好?

标签: rx-java kotlin


【解决方案1】:

假设评论

// Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created

表示您希望当前的Chat(在这种情况下为chat1)在chats 上发送新的Chat 时停止发射,您可以使用switchMap 运算符完成此操作。

val sequence = chats.switchMap { it.lastMessage }

来自 ReactiveX 文档:

RxJava 也实现了 switchMap 操作符。它的行为很像 flatMap,除了每当源发出新项目时 Observable,它将取消订阅并停止镜像 Observable 从先前发出的项目生成,并且仅开始 镜像当前的。

【讨论】:

  • 非常感谢!这很明显:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多