【发布时间】:2018-02-19 03:40:34
【问题描述】:
在 akka 流阶段 FlowShape[A, B] 中,我需要在 A 上执行的部分处理是使用使用 A 数据构建的查询来保存/查询数据存储。但是该数据存储驱动程序查询给了我一个未来,我不确定如何最好地处理它(我的主要问题在这里)。
case class Obj(a: String, b: Int, c: String)
case class Foo(myobject: Obj, name: String)
case class Bar(st: String)
//
class SaveAndGetId extends GraphStage[FlowShape[Foo, Bar]] {
val dao = new DbDao // some dao with an async driver
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with Outhandler {
override def onPush() = {
val foo = grab(in)
val add = foo.record.value()
val result: Future[String] = dao.saveAndGetRecord(add.myobject)//saves and returns id as string
//the naive approach
val record = Await(result, Duration.inf)
push(out, Bar(record))// ***tests pass every time
//mapping the future approach
result.map { x=>
push(out, Bar(x))
} //***tests fail every time
下一阶段取决于查询返回的db记录的id,但我想避免Await。我不确定为什么映射方法会失败:
"it should work" in {
val source = Source.single(Foo(Obj("hello", 1, "world")))
val probe = source
.via(new SaveAndGetId))
.runWith(TestSink.probe)
probe
.request(1)
.expectBarwithId("one")//say we know this will be
.expectComplete()
}
private implicit class RichTestProbe(probe: Probe[Bar]) {
def expectBarwithId(expected: String): Probe[Bar] =
probe.expectNextChainingPF{
case r @ Bar(str) if str == expected => r
}
}
当使用映射未来运行时,我会失败:
should work ***FAILED***
java.lang.AssertionError: assertion failed: expected: message matching partial function but got unexpected message OnComplete
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:406)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.TestSubscriber$ManualProbe.expectEventPF(StreamTestKit.scala:570)
docs 中的异步侧通道示例在阶段的构造函数中具有未来,而不是在阶段内构建未来,因此似乎不适用于我的情况。
【问题讨论】:
-
你可以使用
getAsyncCallack -
我会使用 mapAsync。检查stackoverflow.com/questions/35146418/…这个
标签: scala akka akka-stream