【发布时间】:2016-02-19 22:48:52
【问题描述】:
我创建了一个 akka 流,其中包含一个进程函数和一个错误处理函数。 Source 和 Sink 完全封装在 ClosedShape RunnableFlow 中。我的意图是我将一个项目传递给父类并通过流程运行它。在我开始测试之前,这一切似乎都有效。我正在使用 scala-test 并将附加信息传递给进程函数和错误处理函数中的列表。我随机生成错误以查看事情也流向错误处理函数。问题是如果我将 100 个项目传递给父类,那么我希望错误函数中的项目列表和流程函数中的项目列表加起来为 100。由于 Source 和 Sink 被完全封装,我不没有明确的方法来告诉测试等待,并且在所有项目都通过流处理之前到达断言/应该语句。我创建了this gist 来描述流。
以下是上述要点的示例测试:
import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._
class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestSpec"))
override def afterAll = {
Thread.sleep(500)
mat.shutdown()
TestKit.shutdownActorSystem(system)
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))
"TestSpec" must {
"handle messages" in {
val testStream = new Testing() // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
(1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on
testStream.errors.size should not be (0) // passes
testStream.processed.size should not be (0) // passes
(testStream.processed.size + testStream.errors.size) should be (100) // fails due to checking before all items are processed
}
}
}
【问题讨论】:
标签: scala scalatest akka-stream