【问题标题】:unit testing callbackFlow单元测试回调流
【发布时间】:2022-01-04 09:22:35
【问题描述】:

我有一个这样的基于回调的 API:

  class CallbackApi {
    fun addListener(callback: Callback) {
      // todo
    }

    fun removeListener(callback: Callback) {
      // todo
    }

    interface Callback {
      fun onResult(result: Int)
    }
  }

以及将 API 转换为 hot 冷流的扩展函数:

  fun CallbackApi.toFlow() = callbackFlow<Int> {
    val callback = object : CallbackApi.Callback {
      override fun onResult(result: Int) {
        trySendBlocking(result)
      }
    }
    addListener(callback)
    awaitClose { removeListener(callback) }
  }

您是否介意建议如何编写单元测试以确保 API 正确转换为热流?

这是我的尝试。经过反复试验,我想出了这个解决方案。

  @Test
  fun callbackFlowTest() = runBlocking {
    val callbackApi = mockk<CallbackApi>()
    val callbackSlot = slot<CallbackApi.Callback>()
    every { callbackApi.addListener(capture(callbackSlot)) } just Runs
    every { callbackApi.removeListener(any()) } just Runs
    val list = mutableListOf<Int>()
    val flow: Flow<Int> = callbackApi.toFlow().onEach { list.add(it) }
    val coroutineScope = CoroutineScope(this.coroutineContext + SupervisorJob())
    flow.launchIn(coroutineScope)
    yield()
    launch {
      callbackSlot.captured.onResult(10)
      callbackApi.removeListener(mockk()) // this was a misunderstanding
    }.join()
    assert(list.single() == 10)
  }

但我不明白这个解决方案的两个部分。

1- 如果没有SupervisorJob(),测试似乎永远不会结束。也许出于某种原因收集流量永远不会结束,我不明白。我在一个单独的协程中提供捕获的回调。

2- 如果我删除 callbackSlot.captured.onResult(10) 位于其中的 launch 主体,测试将失败并出现此错误 UninitializedPropertyAccessException: lateinit property captured has not been initialized。我认为yield 应该启动流程。

【问题讨论】:

    标签: unit-testing kotlin kotlin-coroutines


    【解决方案1】:

    以及将 API 转换为热流的扩展函数

    此扩展程序看起来正确,但流程不热(也不应该如此)。它仅在实际收集开始时注册回调,并在收集器取消时取消注册(这包括当收集器使用限制项目数量的终端运算符时,例如 .first().take(n))。

    对于您的其他问题,这是一个非常重要的注意事项。

    在没有该 SupervisorJob() 的情况下,测试似乎永远不会结束。可能出于某种原因收集流量永远不会结束,我不明白

    如上所述,由于流的构造方式(以及CallbackApi 的工作方式),流收集不能由生产者(回调API)决定结束。它只能通过取消收集器来停止,这也会取消注册相应的回调(这很好)。

    您的自定义作业允许测试结束的原因可能是因为您通过在上下文中由没有当前作业作为父作业的自定义作业覆盖上下文中的作业来逃避结构化并发。但是,您可能仍然会从永不取消的范围中泄漏那个永无止境的协程。

    我在一个单独的协程中提供捕获的回调。

    这是正确的,虽然我不明白你为什么从这个单独的协程中调用removeListener。你在这里注销什么回调?请注意,这也不会对流程产生任何影响,因为即使您可以取消注册在 callbackFlow 构建器中创建的回调,它也不会神奇地关闭 callbackFlow 的通道,因此流程不会无论如何都要结束(我假设这是你在这里尝试做的)。

    此外,从外部取消注册回调会阻止您检查它实际上是否被您的生产代码取消注册。

    2- 如果我删除其中 callbackSlot.captured.onResult(10) 的启动体,测试将失败并出现此错误 UninitializedPropertyAccessException: lateinit property capture has not been initialized。我认为 yield 应该开始流动。

    yield() 非常脆弱。如果你使用它,你必须非常清楚每个并发协程的代码当前是如何编写的。它脆弱的原因是它只会将线程让给其他协程,直到下一个暂停点。你无法预测屈服时会执行哪个协程,也无法预测线程到达挂起点后会恢复哪个协程。如果有几个停赛,所有赌注都取消。如果还有其他正在运行的协程,那么所有的赌注也都没有了。

    更好的方法是使用kotlinx-coroutines-test,它提供了像advanceUntilIdle 这样的实用程序,可以确保其他协程都完成或等待暂停点。


    现在如何解决这个测试?我现在无法测试任何东西,但我可能会这样处理:

    • 使用 kotlinx-coroutines-test 中的 runTest 而不是 runBlocking 以更好地控制其他协程何时运行(并等待流集合执行某些操作)
    • 在协程中启动流收集(只是launch/launchIn(this),没有自定义范围)并保留启动的Job的句柄(返回值为launch/launchIn
    • 用一个值调用捕获的回调,advanceUntilIdle() 以确保流收集器的协程可以处理它,然后断言列表获得了元素(注意:由于一切都是单线程的并且回调没有挂起,这将如果没有缓冲区则死锁,但callbackFlow 使用默认缓冲区,所以应该没问题)
    • 可选:用不同的值重复以上几次,并确认它们被流收集
    • 取消收集作业,advanceUntilIdle(),然后测试回调是否未注册(我不是 Mockk 专家,但应该检查一下是否调用了 removeListener

    注意:也许我是老派,但如果您的 CallbackApi 是一个接口(在您的示例中它是一个类,但我不确定它在多大程度上反映了现实),我宁愿实现一个模拟手动使用通道来模拟事件和断言期望。我发现推理和调试更容易。这里是an example of what I mean

    【讨论】:

    • 我发现您的指南非常有用,谢谢@Joffrey 我会根据这些发布答案。
    • 我的意思是输入cold flow 而不是hot flow!对不起!
    【解决方案2】:

    这是我根据@Joffrey 有用指南找到的解决方案:

      @Test
      fun callbackFlowTestSolution() = runTest {
        val callbackApi = mockk<CallbackApi>()
        val callbackSlot = slot<CallbackApi.Callback>()
        every { callbackApi.addListener(capture(callbackSlot)) } just Runs
        every { callbackApi.removeListener(any()) } just Runs
        val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
        val collectedItems = mutableListOf<Int>()
    
        val flow: Flow<Int> = callbackApi.toFlow().onEach { collectedItems.add(it) }
        val collectJob = launch { flow.collect() }
        advanceUntilIdle() // wait for callbackFlow builder to call addListener
        itemsToSend.forEach { callbackSlot.captured.onResult(it) }
        advanceUntilIdle() // wait for flow collection
        collectJob.cancel()
        advanceUntilIdle() // wait for awaitClose
    
        verify { callbackApi.removeListener(callbackSlot.captured) }
        assertArrayEquals(itemsToSend.toIntArray(), collectedItems.toIntArray())
      }
    

    请考虑升级到kotlinx-coroutines-test:1.6.0 以使用runTest。在旧版本中,我们可以使用runBlockingTest,但它已被弃用。


    更新: 如果要检查流是否可以逐个收集项目

      @Test
      fun callbackFlowTestSolution() = runTest {
        val callbackApi = mockk<CallbackApi>()
        val callbackSlot = slot<CallbackApi.Callback>()
        every { callbackApi.addListener(capture(callbackSlot)) } just Runs
        every { callbackApi.removeListener(any()) } just Runs
        val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
        var lastCollectedItem: Int? = null
    
        val flow: Flow<Int> = callbackApi.toFlow().onEach { lastCollectedItem = it }
        val collectJob = launch { flow.collect() }
        advanceUntilIdle() // wait for callbackFlow builder to call addListener
        itemsToSend.forEach {
          callbackSlot.captured.onResult(it)
          advanceUntilIdle() // wait for flow collection
          
          assertEquals(it, lastCollectedItem)
        }
        collectJob.cancel()
        advanceUntilIdle() // wait for awaitClose
    
        verify { callbackApi.removeListener(callbackSlot.captured) }
      }
    

    【讨论】:

    • 这几乎就是我的想法,感谢您将其放入代码中。唯一缺少的部分是在调用回调后检查元素实际上是否在列表中。这样你就可以检查流程是否真的可以一件一件的收集物品,不需要构建完整的列表
    • 嗯听起来不错。我已经更新了解决方案来解决这个问题。
    猜你喜欢
    • 2017-05-09
    • 2017-03-19
    • 2018-01-20
    • 2018-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-09
    • 2017-12-22
    相关资源
    最近更新 更多