【问题标题】:Ensure message order in test when mixing futures with actor messages在将期货与参与者消息混合时确保测试中的消息顺序
【发布时间】:2026-01-19 12:15:02
【问题描述】:

我正在测试一个使用异步基于未来的 API 的演员。当 future 完成时,actor 使用管道模式向自己发送消息:

import akka.pattern.pipe
// ...

// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)

在我的测试中,我模拟了 API,以便通过 Promise 控制未来的完成。但是,这与直接发送给参与者的其他消息交错:

myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")

现在我想知道如何保证演员收到并 在我的测试中处理消息 A 和 C 之间的消息 B 因为消息 B 实际上是在另一个线程中发送的,所以我无法控制顺序 演员的邮箱在其中接收消息。

我想到了几种可能的解决方案:

  • 在每条消息后休眠几毫秒,以制作另一条消息 不太可能下单

  • 等待参与者确认每条消息,尽管 仅在测试时需要确认

  • 直接向actor发送消息B以模拟完成 未来并编写一个单独的测试,以确保管道模式 被正确使用(如果演员愿意,上面的测试不会失败 不将结果消息传递给自身)

我不太喜欢这两个选项,但我倾向于使用最后一个 一。还有其他更好的方法可以在测试中强制执行特定的消息顺序吗?

澄清:问题不在于如何处理消息可能在生产中以随机顺序接收的事实。控制测试中的顺序对于确保参与者可以实际处理不同的消息顺序至关重要。

【问题讨论】:

    标签: scala testing akka actor akka-testkit


    【解决方案1】:

    一个想法是在你的actor中定义一个标志来指示actor是否接收到消息B。当actor接收到消息C时,如果标志为假,actor可以stash那个消息C,然后一旦Actor 接收到消息 B。例如:

    class MyActor extends Actor with Stash {
    
      def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
        case MakeApiCall =>
          callExternalApi().mapTo[MessageB].pipeTo(self)
    
        case m: MessageB if seenMsgC => // assume msg C has been stashed
          unstashAll()
          // ...do something with msg B
          become(receiveBlock(true, seenMsgC)) // true, true
        case m: MessageB if !seenMsgC =>
          // ...do something with message B
          become(receiveBlock(true, seenMsgC)) // true, false
    
        case m: MessageC if seenMsgB =>
          // ...do something with message C
          context.become(receiveBlock(seenMsgB, true)) // true, true
        case m: MessageC if !seenMsgB =>
          stash()
          context.become(receiveBlock(seenMsgB, true)) // false, true
    
        case ...
      }
    
      def receive = receiveBlock(false, false)
    }
    

    【讨论】:

    • 抱歉,我的问题可能具有误导性。这不是要让actor在不同的消息顺序方面更加健壮,而是要在测试中产生特定的顺序以确保它可以处理它。我在问题中附加了另一段以说明这一点。
    • 您能否对有状态的参与者使用相同的方法,并将存储用作对被测参与者的所有调用的转发累积代理?所以只要把所有的消息都发给这个代理actor,他的作用就是保证消息B等消息A之后才会转发。
    • @DanilaPolevshikov 问题是当未来完成时,当它向自己发送消息时,参与者不会知道代理。
    • @jeffrey-chung 我终于找到了解决方案并将其发布为答案。感谢您的帮助。
    【解决方案2】:

    在阅读了更多关于 akka 的内容后,我终于找到了一个更好的解决方案:将 actor 邮箱替换为我可以在测试中观察到的邮箱。这样我就可以等到演员在我完成承诺后收到一条新消息。只有这样才能发送下一条消息。这个TestingMailbox的代码在文末给出。

    更新:在 Akka Typed 中,这可以通过 BehaviorInterceptor 非常优雅地实现。只需使用自定义拦截器包装被测Behavior,该拦截器转发所有消息和信号,但让您观察它们。 下面给出无类型 Akka 的邮箱解决方案。


    演员可以这样配置:

    actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))
    

    我必须通过提供配置来确保参与者系统知道“测试邮箱”:

    class MyTest extends TestKit(ActorSystem("some name",
        ConfigFactory.parseString("""{ 
            testing-mailbox = {
                mailbox-type = "my.package.TestingMailbox" 
            }
        }"""))) 
        with BeforeAndAfterAll // ... and so on
    

    设置完成后,我可以像这样更改我的测试:

    myActor ! Message("A")
    val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
    promiseFromApiCall.success(Message("B"))
    Await.ready(nextMessage, 3.seconds)
    myActor ! Message("C")
    

    用一点辅助方法,我什至可以这样写:

    myActor ! Message("A")
    receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
    myActor ! Message("C")
    

    这是我的自定义邮箱:

    import akka.actor.{ActorRef, ActorSystem}
    import akka.dispatch._
    import com.typesafe.config.Config 
    import scala.concurrent.{Future, Promise}
    
    object TestingMailbox {
    
      val promisesByReceiver =
        scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()
    
      class MessageQueue extends UnboundedMailbox.MessageQueue {
    
        override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
          super.enqueue(receiver, handle)
          promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
        }
    
      }
    
      def nextMessage(receiver: ActorRef): Future[Any] =
        promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future
    
    }
    
    class TestingMailbox extends MailboxType
      with ProducesMessageQueue[TestingMailbox.MessageQueue] {
    
      import TestingMailbox._
    
      def this(settings: ActorSystem.Settings, config: Config) = this()
    
      final override def create(owner: Option[ActorRef],
                                system: Option[ActorSystem]) =
          new MessageQueue()
    
    }
    

    【讨论】:

      【解决方案3】:

      如果订购消息非常重要,您应该使用返回Futureask (?) 并将它们链接起来,即使您不希望从参与者那里得到任何响应。

      【讨论】: