【问题标题】:Difference between map and mapAsyncmap 和 mapAsync 的区别
【发布时间】:2016-05-10 20:12:20
【问题描述】:

谁能解释一下 map 和 mapAsync w.r.t AKKA 流之间的区别? In the documentation据说

涉及外部非流的流转换和副作用 可以使用 mapAsync 或 mapAsyncUnordered 执行基于服务的服务

为什么我们不能简单地在这里映射?我假设 Flow、Source、Sink 本质上都是 Monadic 的,因此 map 应该可以正常工作,w.r.t 这些性质的延迟?

【问题讨论】:

    标签: scala akka-stream


    【解决方案1】:

    签名

    区别在signatures 中最突出:Flow.map 接受一个返回类型T 的函数,而Flow.mapAsync 接受一个返回类型Future[T] 的函数。

    实例

    例如,假设我们有一个函数,它根据用户 ID 查询数据库以获取用户的全名:

    type UserID   = String
    type FullName = String
    
    val databaseLookup : UserID => FullName = ???  //implementation unimportant
    

    给定一个 UserID 的 akka 流 Source 值,我们可以在流中使用 Flow.map 来查询数据库并将全名打印到控制台:

    val userIDSource : Source[UserID, _] = ???
    
    val stream = 
      userIDSource.via(Flow[UserID].map(databaseLookup))
                  .to(Sink.foreach[FullName](println))
                  .run()
    

    这种方法的一个限制是该流一次只能进行 1 个 db 查询。这种串行查询将成为“瓶颈”,并且可能会阻止我们的流中的最大吞吐量。

    我们可以尝试通过使用Future 的并发查询来提高性能:

    def concurrentDBLookup(userID : UserID) : Future[FullName] = 
      Future { databaseLookup(userID) }
    
    val concurrentStream = 
      userIDSource.via(Flow[UserID].map(concurrentDBLookup))
                  .to(Sink.foreach[Future[FullName]](_ foreach println))
                  .run()
    

    这个简单的附录的问题在于我们有效地消除了背压。

    Sink 只是拉入 Future 并添加了一个foreach println,与数据库查询相比,速度相对较快。该流将不断地将需求传播到源并在Flow.map 内产生更多期货。因此,并发运行的databaseLookup 的数量没有限制。不受约束的并行查询最终可能会使数据库过载。

    Flow.mapAsync 救援;我们可以同时访问数据库,同时限制同时查找的数量:

    val maxLookupCount = 10
    
    val maxLookupConcurrentStream = 
      userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
                  .to(Sink.foreach[FullName](println))
                  .run()
    

    还要注意Sink.foreach 变得更简单了,它不再接受Future[FullName],而是只接受FullName

    无序异步映射

    如果不需要将 UserID 保持到 FullName 的顺序,那么您可以使用 Flow.mapAsyncUnordered。例如:您只需将所有名称打印到控制台,而不关心它们的打印顺序。

    【讨论】:

    • mapAsync 是否类似于将异步边界应用于该特定阶段?根据文档,标记异步边界将在actor中运行每个阶段,只是想知道它是否相同。
    • 使用"com.typesafe.akka" %% "akka-stream" % "2.6.3",试过这个例子,编译错误type mismatch; found : akka.stream.scaladsl.Flow[Boolean,Boolean,akka.NotUsed] required: akka.stream.Graph[akka.stream.FlowShape[String,?],?]。如果有人可以为最新版本的 Akka Streams 更新此示例,我将不胜感激
    • @RamonJ 你能解释一下 mapAsync 将在哪个线程上执行吗?它会使用与 graph 相同的调度程序的线程吗(akka 流为它创建一个actor,除非你创建异步边界)?
    • @beinghuman 鉴于mapAsync 不接受ExecutionContext,我认为是的,它使用与图表本身相同的调度程序。但是,我还没有阅读代码来确认是这种情况。
    • @radumanolescu 上述答案中没有任何部分使用Boolean,因此您的错误消息似乎没有使用“this example”。随意问一个stackoverflow问题,我会尝试看看......
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-26
    • 2012-04-21
    • 2017-08-08
    • 2012-07-07
    • 2020-04-02
    • 2013-03-17
    相关资源
    最近更新 更多