【问题标题】:How to DROP_LATEST with coroutine Flow<T>?如何使用协程 Flow<T> DROP_LATEST?
【发布时间】:2020-11-15 13:13:37
【问题描述】:

请帮忙。我试图这样做:

    2sec     2sec     2sec
------[A]------[B]------[C]------...----------------> InitailFlow
       \        |        | 
        \      drop      drop
         \
     5sec \    5sec        5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow
result: [A1, A2, A3]

所以我有 InitailFlow,它会发出很短的时间(2 秒),然后转换为 AnotherFlow,它需要更长的时间才能完成(总共 15 秒)...我想删除由InitialFlow 而 AnotherFlow 尚未完成...

我试过了:

flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
    .onEach {
       println("Event for $it")
    }
    .flatMapConcat {
       flow {
           delay(5000)
           emit("${it}1")
           delay(5000)
           emit("${it}2")
           delay(5000)
           emit("${it}3")
        }
     }
     .onEach {
         println(it)
     }
     .launchIn(scope)

但由于某种原因,结果如下:

Event for A
A1
A2
A3
Event for B
B1
B2
B3

即使我有一个.buffer(0, BufferOverflow.DROP_LATEST),它仍然会处理Event B

为什么还要处理事件 B?

有没有办法做到这一点?我希望输出只有:

Event for A
A1
A2
A3

提前致谢。

【问题讨论】:

  • 您使用的是哪个协程版本?我相信我使用的是最新的但我找不到.buffer(0, BufferOverflow.DROP_LATEST) 功能...
  • @Deadbeef 1.4.0添加
  • 我并没有真正得到您期望的输出。你能举一个你想要的输出的例子吗?
  • @Deadbeef 它的 1.4.0-M1
  • @marstran 当然!我添加了预期的输出......它也与我的问题中的“图表”相同。所以只是为了澄清一点......只要flatMap中的其他流程不是' t 完成...必须删除所有其他传入事件..如果 flatMap flow 完成,它可以再次接受另一个事件...

标签: kotlin kotlin-coroutines


【解决方案1】:

这应该适合你:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = flow {
    coroutineScope {
        val channel = produce(capacity = Channel.RENDEZVOUS) {
            collect { offer(it) }
        }
        channel.consumeEach { emit(it) }
    }
}

这基本上是来自kotlin docs 的“幼稚”buffer 实现
这里唯一的区别是我们使用channel.offer 而不是channel.send 当与 RENDEZVOUS 通道一起使用时,提供给通道的所有值(在暂停时)都会被丢弃,从而创建您想要的行为。

【讨论】:

  • 还没有尝试过,但如果这可行,这不应该是.buffer(RENDEZVOUS, DROP_LATEST) 的实现吗?因为 RENDEZVOUS == 0.
  • 我自己最初也是这么想的,但是文档声明“要实现任一自定义策略 [DROP_OLDEST/DROP_LATEST],至少使用一个元素的缓冲区。”
  • 聪明的解决方案,但这也不起作用。我得到这个输出:Event for A, Event for B, Event for C, C1, C2, C3。我认为这是因为此函数将在继续使用flatMapConcat 之前从上游流中发出所有事件。
  • 您是否将代码中的.buffer(0, BufferOverflow.DROP_LATEST) 替换为dropIfBusy()?我使用它得到了预期的输出
  • 如果这仍然不起作用,它可能取决于上下文。 scope 使用什么 Dispatcher?
【解决方案2】:

在玩了一会儿@AdrianK 的解决方案之后,我实际上找到了一个使用channelFlow 的更简单的解决方案。由于channelFlow 目前是实验性 API,您必须选择使用它。

像这样:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(0)

【讨论】:

    【解决方案3】:

    每次调用 emit 时都会暂停,因此 buffer 运算符无效。

    我不确定您期望的输出,但我认为您正在寻找 conflate 运算符:

    flow{
        delay(2000)
        emit("A")
        delay(2000)
        emit("B")
        delay(2000)
        emit("C")
    }
    .conflate()
    .onEach {
        println("Event for $it")
    }
    .flatMapConcat {
        flow {
            delay(5000)
            emit("${it}1")
            delay(5000)
            emit("${it}2")
            delay(5000)
            emit("${it}3")
        }
    }
    .onEach {
        println(it)
    }
    .launchIn(scope)
    

    conflate 运营商官方文档指出:

    通过合并通道合并流量排放并在一个集合中运行收集器 单独的协程。这样做的效果是发射器永远不会 由于收集器速度慢而暂停,但收集器总是得到最多 最近发出的值。

    您可以找到conflate运营商官方文档here

    【讨论】:

    • 正如上面评论中提到的,使用conflate 并不能实现OP想要做的事情。
    • conflate 不起作用.. 不会删除 LATEST 值.. 它会删除 OLDEST... 这是文档:"/** * 请求 Channel(...) 中的合并频道工厂函数。这是使用 [onBufferOverflow = DROP_OLDEST][BufferOverflow.DROP_OLDEST] 创建通道的快捷方式。*/"
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-29
    • 2022-01-09
    • 2021-07-14
    • 2021-11-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多