【问题标题】:Observable to reuse operator executions可观察以重用运算符执行
【发布时间】:2016-07-05 15:59:21
【问题描述】:

给出以下示例(kotlin 代码)

val subject = PublishSubject.create<Int>()

val stream = subject.map {
    println("mapping")
    it * 2
}

stream.forEach { println("A: $it") }
stream.forEach { println("B: $it") }

subject.onNext(1)
subject.onCompleted()

输出将是

mapping
A: 2
mapping
B: 2

我想要实现的是源 observable 被映射一次,所有订阅者都得到结果,但不为每个订阅者执行映射操作......

喜欢这个

mapping
A: 2
B: 2

在我的情况下,我在延迟和性能至关重要的地方进行了非常昂贵的计算,我有一个热门的 observable 作为来源和很多订阅者...

我们如何重用操作符执行?以及通常不同的映射操作?

【问题讨论】:

    标签: java rx-java reactive-programming kotlin


    【解决方案1】:

    您可以使用 cache 将源 observable 的结果缓存给任何未来的订阅者:

    val stream = subject.map {
        println("mapping")
        it * 2
    }.cache()
    

    如果您想更微妙地控制缓存的方式,replay 值得研究。

    如果您不想缓存源 observable 的每个项目,而只想 重新发布新项目,您可以使用 publishautoConnect

    val stream = subject.map {
        println("mapping")
        it * 2
    }.publish()
     .autoConnect()
    

    给出以下事件序列:

    stream.forEach { println("A: $it") }
    stream.forEach { println("B: $it") }
    
    subject.onNext(1)
    
    stream.forEach { println("C: $it") }
    subject.onNext(2)
    subject.onCompleted()
    

    将打印:

    mapping
    A: 2
    B: 2
    mapping
    A: 4
    B: 4
    C: 4
    

    【讨论】:

    • 哦,天哪,这真的很糟糕,我刚刚检查了运营商的来源,每个订阅者都执行链中的所有内容......缓存不符合我的要求,我不想将所有排放存储在任何地方,我只是希望它计算并向所有人传播,而不是为所有人计算:(
    • 我知道这是在寻求帮助,但是请您复制和修改 OperatorMap(基本映射运算符),以便它只调用一次转换器...我只需要看看什么是正确的方法这样做......如果有的话
    • 调试工作流,似乎我对它的工作原理有错误的理解,原始源 observable 有两个观察者,所以该划分来自根,我希望它有一个订阅者,而这个在轮到它还有其他 2...
    • 还有其他 RX 框架不能以这种方式工作吗?这就是这样设计的……每个订阅者都为自己执行整个链工作,没有任何东西被重用……非常糟糕的设计恕我直言,特别是考虑到 RX 打算我们使用纯函数和不可变数据……为什么要这样做为 100 个订阅者工作,如果它应该只做一次......
    • 很好,尽管我坚持我的解决方案,但我会接受这个作为答案,因为它至少会创建更少的对象并且看起来不像黑客。此外,您的解决方案适用于纯 java cos,您可以使用它而不会破坏链条,而我的解决方案只有在您拥有称为“扩展功能”的出色功能时才有效......感谢您的回答:)
    【解决方案2】:

    我找到了解决方案。为了重用管道的执行,我们必须确保只有一个订阅者,并且该订阅者将来自管道末端的所有排放传播到所有订阅者的条目......这听起来很像主题!

    如果我们只订阅 100 次,我们将从 observable 的源头开始有 100 个管道,而在这种情况下,我们有一个管道,它在它的最后分支到 100 个小管道...

    fun <T> Observable<T>.hub(): Observable<T> {
        val hub = PublishSubject.create<T>()
        this.subscribe(hub)
        return hub
    }
    

    现在我们可以这样做了

    val subject = PublishSubject.create<Int>()
    
    val stream = subject.map {
        println("mapping")
        it * 2
    }
    
    val hub = stream.hub()
    
    hub.subscribe { println("A: $it") }
    hub.subscribe { println("B: $it") }
    
    subject.onNext(1)
    
    subject.onCompleted()
    

    这会给出这个

    mapping
    A: 2
    B: 2
    

    问题解决了!

    【讨论】:

      猜你喜欢
      • 2015-11-14
      • 1970-01-01
      • 2017-12-17
      • 2020-11-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多