【问题标题】:Sink fold for akka stream Source.actorRef buffer and OverflowStrategyakka 流 Source.actorRef 缓冲区和溢出策略的接收器折叠
【发布时间】:2017-08-30 02:29:39
【问题描述】:

这是来自akka documentation的代码sn-p

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
  .toMat(sinkUnderTest)(Keep.both).run()

ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")

val result = Await.result(future, 3.seconds)
assert(result == "123")

这是一个工作代码 sn-p,但是,如果我使用 ref 告诉另一个消息,如 ref ! 4 ,我会得到一个像 akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3) 这样的异常

我想缓冲区大小 3 应该足够了。原因是折叠操作是(acc, ele) => acc,所以需要累加器和元素返回新的值累加器。

所以我更改了代码让另一个演员告诉等待 3 秒。它又开始工作了。

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  ref ! 1
  ref ! 2
  ref ! 3
  Thread.sleep(3000)
  ref ! 4
  ref ! akka.actor.Status.Success("done")

  val result = Await.result(future, 10.seconds)

  println(result)

但是,我的问题是,有没有一种方法可以告诉 Akka 流放慢速度或等待接收器可用。我也在用OverflowStrategy.backpressure,但是上面写着Backpressure overflowStrategy not supported

还有其他选择吗?

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    您应该将Source.queue 视为一种以背压感知方式将元素从外部注入流的方法。

    Source.queue 将具体化为您可以提供元素的队列对象,但是当您提供它们时,您将返回一个 Future,当流准备好接受消息时完成。

    下面的例子:

      val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
    
      val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
        Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
    
      Future.sequence(Seq(
        queue.offer(1),
        queue.offer(2),
        queue.offer(3),
        queue.offer(4)
      ))
    
      queue.complete()
    
      val result = Await.result(future, 10.seconds)
    
      println(result)
    

    更多信息请关注docs

    【讨论】:

    • 感谢您的回复。您能否也提供代码 sn-p 。我对来自Source.queueEnqueuedoffer 的东西有点困惑
    • 添加示例
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-30
    • 2021-06-11
    • 1970-01-01
    • 1970-01-01
    • 2018-07-26
    • 1970-01-01
    • 2015-12-16
    相关资源
    最近更新 更多