【问题标题】:Merging kotlin flows合并 kotlin 流
【发布时间】:2020-01-01 17:45:07
【问题描述】:

给定 2 个或更多具有相同类型的流,是否有现有的 Kotlin 协程函数来合并它们,例如 RX 合并运算符?

目前我正在考虑这个:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
    val flowJobs = flows.map { flow ->
        GlobalScope.launch { flow.collect { send(it) } }
    }
    flowJobs.joinAll()
}

但它似乎有些笨拙。

【问题讨论】:

    标签: kotlin kotlin-coroutines


    【解决方案1】:

    我还不太熟悉流程,所以这可能不是最理想的。无论如何,我认为您可以创建所有输入流的流,然后使用flattenMerge 将它们再次展平为单个流。像这样的:

    fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()
    

    编辑:

    merge-function 在 1.3.3 版本中被添加到 kotlinx-coroutines 中。见这里:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

    【讨论】:

    • 看起来效果很好!正如你所说,不确定它有多优化,但现在它是完美的。
    • 您应该知道,与问题中提供的代码不同,这默认限制为同时运行 16 个流,请参阅 flattenMerge 处的 concurrency 参数。
    【解决方案2】:

    现在(撰写本文时,Coroutines 版本 1.3.5)是 Coroutines 库的一部分。

    你可以这样使用它:

    val flowA = flow { emit(1) } 
    val flowB = flow { emit(2) }
    
    merge(flowA, flowB).collect{ println(it) } // Prints two integers
    // or:
    listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers
    

    您可以在the source code阅读更多内容

    【讨论】:

      【解决方案3】:

      可能已经晚了,但我相信这可能是一个可行的解决方案:

      fun <T> combineMerge(vararg flows: Flow<T>) = flow {
          coroutineScope {
              flows.forEach {
                  launch {
                      it.collect {
                          emit(it)
                      }
                  }
              }
          }
      }
      
      fun <T> combineConcat(vararg flows: Flow<T>) = flow {
          flows.forEach {
              it.collect {
                  emit(it)
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-08-21
        • 2023-01-27
        • 2021-04-21
        • 1970-01-01
        • 1970-01-01
        • 2019-12-25
        • 2016-09-12
        • 1970-01-01
        相关资源
        最近更新 更多