【问题标题】:Akka streams: dealing with futures within graph stageAkka 流:在图阶段处理期货
【发布时间】:2018-02-19 03:40:34
【问题描述】:

在 akka 流阶段 FlowShape[A, B] 中,我需要在 A 上执行的部分处理是使用使用 A 数据构建的查询来保存/查询数据存储。但是该数据存储驱动程序查询给了我一个未来,我不确定如何最好地处理它(我的主要问题在这里)。

case class Obj(a: String, b: Int, c: String)
case class Foo(myobject: Obj, name: String)
case class Bar(st: String)
//
class SaveAndGetId extends GraphStage[FlowShape[Foo, Bar]] {
 val dao = new DbDao // some dao with an async driver 

 override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
  setHandlers(in, out, new InHandler with Outhandler {
   override def onPush() = {
    val foo = grab(in)
    val add = foo.record.value()
    val result: Future[String] = dao.saveAndGetRecord(add.myobject)//saves and returns id as string

   //the naive approach
    val record = Await(result, Duration.inf)
    push(out, Bar(record))// ***tests pass every time

  //mapping the future approach
    result.map { x=>
     push(out, Bar(x))
    } //***tests fail every time

下一阶段取决于查询返回的db记录的id,但我想避免Await。我不确定为什么映射方法会失败:

"it should work" in {
  val source = Source.single(Foo(Obj("hello", 1, "world")))
  val probe = source
    .via(new SaveAndGetId))
    .runWith(TestSink.probe)
  probe
   .request(1)
   .expectBarwithId("one")//say we know this will be
   .expectComplete()
 }
 private implicit class RichTestProbe(probe: Probe[Bar]) {
  def expectBarwithId(expected: String): Probe[Bar] = 
   probe.expectNextChainingPF{
    case r @ Bar(str) if str == expected => r
  }
 }

当使用映射未来运行时,我会失败:

should work ***FAILED***
java.lang.AssertionError: assertion failed: expected: message matching partial function but got unexpected message OnComplete
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:406)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.TestSubscriber$ManualProbe.expectEventPF(StreamTestKit.scala:570)

docs 中的异步侧通道示例在阶段的构造函数中具有未来,而不是在阶段内构建未来,因此似乎不适用于我的情况。

【问题讨论】:

标签: scala akka akka-stream


【解决方案1】:

我认为您的GraphStage 过于复杂了。下面的Flow 执行相同的操作,无需编写自定义阶段:

val dao = new DbDao

val parallelism = 10 //number of parallel db queries

val SaveAndGetId : Flow[Foo, Bar, _] = 
  Flow[Foo]
    .map(foo => foo.record.value().myobject)
    .mapAsync(parallelism)(rec => dao.saveAndGetRecord(rec))
    .map(Bar.apply)

我通常尝试将GraphStage 视为最后的手段,几乎总是有一种惯用的方法可以通过使用 akka-stream 库提供的方法来获得相同的 Flow。

【讨论】:

    【解决方案2】:

    我同意拉蒙的观点。在这种情况下,不需要构造一个新的FlowShape,而且它太复杂了。这里使用mapAsync方法非常方便:

    这里是使用mapAsync的代码sn-p:

    import akka.stream.scaladsl.{Sink, Source}
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    
    object MapAsyncExample {
    
      val numOfParallelism: Int = 10
    
      def main(args: Array[String]): Unit = {
        Source.repeat(5)  
          .mapAsync(parallelism)(x => asyncSquare(x))           
          .runWith(Sink.foreach(println)) previous stage
      }
    
      //This method returns a Future
      //You can replace this part with your database operations
      def asyncSquare(value: Int): Future[Int] = Future {
        value * value
      }
    }
    

    在上面的 sn-p 中,Source.repeat(5) 是一个虚拟源,可以无限期地发出 5。有一个示例函数asyncSquare 接受integer 并在Future 中计算其平方。 .mapAsync(parallelism)(x => asyncSquare(x)) 行使用该函数并将Future 的输出发送到下一阶段。在这个片段中,下一个阶段是sink,它会打印每个项目。

    parallelism 是可以同时运行的asyncSquare 调用的最大数量。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-22
      • 1970-01-01
      • 1970-01-01
      • 2016-09-06
      • 2013-04-12
      • 1970-01-01
      相关资源
      最近更新 更多