【问题标题】:Waiting for all Flows in a loop finished等待循环中的所有流完成
【发布时间】:2020-07-28 07:44:22
【问题描述】:

我有一个将数据作为流返回的 API:

suspend fun loadData(...): Flow<List<Items>> 
suspend fun loadDetails(...): Flow<ItemDetails>

当我获取数据时,我需要加载一些项目的详细信息并将结果转换为实时数据:

job = SupervisorJob()
val context = CoroutineScope(job + Dispatchers.Main.immediate).coroutineContext
liveData(context = context) {
  emitSource(
        api.loadData(...)
           .flatMapConcat { items ->
              items.forEach { item -> 
                 if (item is Details){
                    api.loadDetails(item).flatMapConcat{ details ->
                        item.details = details
                    }
                 }
              }
             flow {
               emit(items)
             }
            }
   ) 

这里emit(items)loadDetails 完成之前调用的问题,所以item.details = details 更新调用。

如何等待forEach更新所有项目?

【问题讨论】:

  • loadDetails 实际上返回一个值流还是基本上只是一个封装在流中的挂起函数?
  • @AdrianK 返回一个流,数据来自 ConflatedBroadcastChannel。事实上,我需要等待 ConflatedBroadcastChannel 返回“成功”数据状态(并忽略“加载”状态)
  • 创建了只返回“成功”数据的包装流。并试图调用 .collect()。还是不行:(

标签: kotlin kotlin-coroutines kotlin-flow


【解决方案1】:

好吧,我在这里做一些假设,所以如果我有任何错误,请在 cmets 中纠正我,我会更新我的答案。

一般来说,如果您不是绝对需要flatMapConcat,则不建议使用它(它实际上写在函数的文档中)

我假设loadDetails 可以简单地表示为:

suspend fun loadDetails(item: Details) = flow<Any>{
    emit(Loading()) // some sort of sealed wrapper class
    val value = someAsyncOperation()
    emit(Success(value))
}

现在我们定义一个简单的辅助函数来获取loadDetails发出的第一个Success

suspend fun simplerLoadDetails(item: Details): Any { 
    // this will collect items of the flow until one matches the lambda
    val wrappedSuccessValue = loadDetails(item)
        .first { it is Success } as Success 
    return wrappedSuccessValue.value
}

另一个处理整个列表

suspend fun populateDetails(items: List<Any>) {
    items.forEach {
        if (it is Details) {
            it.details = simplerLoadDetails(it)
        }  
    }

}

此处注意:如果您需要并行使用,这将按顺序处理所有元素

suspend fun populateDetails(items: List<Any>) {
    coroutineScope {
        items.forEach {
            if (it is Details) {
                launch {
                    it.details = simplerLoadDetails(it)
                }  
            }
        }
    }
}

现在对于loadData 发出的每个元素列表,我们需要做的就是调用populateDetails

suspend fun itemsWithDetails(...) = loadData(...)
    .onEach { itemsList ->
        populateDetails(itemsList)
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-09-09
    • 1970-01-01
    • 2012-09-30
    • 1970-01-01
    • 2017-05-03
    • 2020-11-10
    • 1970-01-01
    相关资源
    最近更新 更多