【问题标题】:How to test an akka stream closed shape runnable graph with an encapsulated source and sink如何使用封装的源和接收器测试 akka 流封闭形状可运行图
【发布时间】:2016-02-19 22:48:52
【问题描述】:

我创建了一个 akka 流,其中包含一个进程函数和一个错误处理函数。 SourceSink 完全封装在 ClosedShape RunnableFlow 中。我的意图是我将一个项目传递给父类并通过流程运行它。在我开始测试之前,这一切似乎都有效。我正在使用 scala-test 并将附加信息传递给进程函数和错误处理函数中的列表。我随机生成错误以查看事情也流向错误处理函数。问题是如果我将 100 个项目传递给父类,那么我希望错误函数中的项目列表和流程函数中的项目列表加起来为 100。由于 Source 和 Sink 被完全封装,我不没有明确的方法来告诉测试等待,并且在所有项目都通过流处理之前到达断言/应该语句。我创建了this gist 来描述流。

以下是上述要点的示例测试:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}

【问题讨论】:

    标签: scala scalatest akka-stream


    【解决方案1】:

    根据 Viktor Klang 对链接 Gist 的评论。事实证明这是一个很好的解决方案:

    def consume(
        errorHandler: BadData => Unit, fn: Data => Unit, a: String
      ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
        GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
          import GraphDSL.Implicits._
    
          val source = b.add(Source.single(a))
          val broadcast = b.add(Broadcast[String](2))
          val merge = b.add(Zip[String, String])
          val process = new ProcessorFlow(fn)
          val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
          val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
            (input: Xor[BadData, Data]) =>
              input.swap.getOrElse((new Throwable, ("", "")))
          ))
    
          source ~> broadcast.in
                    broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                    broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                             merge.out ~> process ~> failed ~> errors ~> sink
    
          ClosedShape
        }
      )
    

    这允许我在 RunnableGraph 上 Await.result 进行测试。再次感谢 Viktor 提供的解决方案!

    【讨论】:

    • 不错的解决方案。虽然它确实有效,但我不明白为什么。您能否解释一下提供接收器作为参数如何将 Graph Mat 类型从 NotUsed 变为 Future[Done]? IIUC,Sink 应该是GraphApply#createg1: Graph[Shape, Mat] 还是我在这里误解了什么?
    • 确认,这是有效的,作为参数传递给GraphDSL.create 的接收器更改了返回类型并允许等待图表完成。这看起来像一个 hack,我很惊讶没有更直观的解决方案可以做到这一点。有人知道另一种方法吗?
    • 之所以有效,是因为 Sink.foreach 为您做 Mat def foreach[T](f: T => Unit): Sink[T, Future[Done]] = Flow[T].map (f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
    猜你喜欢
    • 2016-09-12
    • 2022-11-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-04-10
    • 2013-03-02
    • 1970-01-01
    • 2013-10-13
    相关资源
    最近更新 更多