【问题标题】:Result of map and mapAsync(1) in Akka StreamAkka Stream 中 map 和 mapAsync(1) 的结果
【发布时间】:2016-03-19 08:59:52
【问题描述】:

我使用mapAync(1) 的代码不能按我的意愿工作。但是当我通过使用Await.resultmapAsync(1) 更改为map 时,它可以工作。所以我有一个问题。 以下(A) Use map(B) use mapAsync(1) 在任何时候都会产生相同的结果吗?

// (A) Use map
someSource
 .map{r => 
   val future = makeFuture(r) // returns the same future if r is the same
   Await.result(future, Duration.Inf)
 }

// (B) Use mapAsync(1)
someSource
 .mapAsync(1){r =>
   val future = makeFuture(r) // returns the same future if r is the same
   future
}

其实我是想粘贴我的真实代码,但是粘贴时间太长,而且对我的原始阶段有一些依赖。

【问题讨论】:

  • mapAsync 本身返回一个Future[T],我认为实际上不需要用makeFuture 创造的另一个未来来包装它。
  • 非常感谢您的回复。对不起,但我无法理解你所说的。我认为Source[O, Mat]#mapAsync[T](par: Int)(f: O => Future[T]) 返回Source[T, Mat],而不是Future[T]
  • makeFuture 的代码是什么样的,r 的类型是什么?
  • 对不起我的无良代码。在我的真实代码中,r 的类型是akka.util.ByteString,但我认为类型可以是任何东西。
  • 感谢您和其他回答者,我的代码确实按照我的意愿工作。在制作复现代码的过程中,我收集了我的错误。错误出现在我原来的 GraphStage 中。非常感谢!

标签: scala akka akka-stream


【解决方案1】:

虽然语义上两个流的类型最终相同 (Source[Int, NotUsed]),但示例 (A) 中显示的样式非常糟糕 - 请不要在流中阻塞 (Await)。

这种情况正是mapAsync 的用例。您的操作返回一个Future[T],并且您希望在未来完成后将该值向下推送到流中。请注意,mapAsync 中没有阻塞,它会安排一个回调来在内部推送未来的值,并且一旦完成就会这样做。

要回答您关于“他们做同样的事情吗?”的问题,技术上是的,但第一个问题会导致您正在运行的线程池中出现性能问题,当 mapAsync 可以完成这项工作时,请避免 map+blocking。

【讨论】:

  • 非常感谢您的回复。我想使用mapAsync,但在我的特定代码中mapAsync 不起作用。所以,我用mapAwait替换了mapAsync(A)。然后我的代码工作。但我不明白为什么替换会影响结果。为了知道原因,我问。我想粘贴代码,但是太长了。所以我会尝试用短代码来制作这个现象。
  • 我认为您的代码中肯定还有其他问题,mapAsync 可能在这里没有错。请尝试寻找复制者
  • 是的,你是对的! mapAsync 没有错。感谢您和其他回答者,我的代码确实按照我的意愿工作。在制作复现代码的过程中,我收集了我的错误。错误出现在我原来的 GraphStage 中。非常感谢!
【解决方案2】:

这些调用在语义上非常相似,尽管使用Await 进行阻塞可能不是一个好主意。当然,这两个调用的类型签名是相同的(Source[Int, NotUsed]),并且在许多情况下,这些调用会产生相同的结果(阻塞)。例如,以下内容包括预定的期货和针对失败的非默认监督策略,对于内部带有 Await 的 map 和 mapAsync 给出相同的结果:

import akka.actor._
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps

object Main {

  def main(args: Array[String]) {

    implicit val system = ActorSystem("TestSystem")
    implicit val materializer = ActorMaterializer()
    import scala.concurrent.ExecutionContext.Implicits.global
    import system.scheduler

    def makeFuture(r: Int) = {
      akka.pattern.after(2 seconds, scheduler) {
        if (r % 3 == 0)
          Future.failed(new Exception(s"Failure for input $r"))
        else
          Future(r + 100)
      }
    }

    val someSource = Source(1 to 20)

    val mapped = someSource
      .map { r =>
        val future = makeFuture(r)
        Await.result(future, Duration.Inf)
      }.withAttributes(supervisionStrategy(resumingDecider))

    val mappedAsync = someSource
      .mapAsyncUnordered(1) { r =>
        val future = makeFuture(r)
        future
      }.withAttributes(supervisionStrategy(resumingDecider))

    mapped runForeach println
    mappedAsync runForeach println

  }

}

您的上游代码可能以某种方式依赖于 map 调用中的阻塞行为。您能否对您所看到的问题进行简明的复制?

【讨论】:

  • 感谢您的回复。 > 你能对你所看到的问题进行简明的复制吗?实际上,在问之前,我尝试过对问题代码进行简单的复现,但我做不到。但我会重新尝试制作一个。
  • 感谢您和其他回答者,我的代码确实按照我的意愿工作。在制作复现代码的过程中,我收集了我的错误。错误出现在我原来的GraphStage 中。非常感谢!
猜你喜欢
  • 2016-05-10
  • 2018-04-28
  • 2017-04-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-21
相关资源
最近更新 更多